This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 72e6acc NIFI-9327: Added timewindow query to QueryNiFiReportingTask
and MetricsEventReportingTask
72e6acc is described below
commit 72e6accc1240f7a4e7e7ee1f658615349e9db0e1
Author: Lehel <[email protected]>
AuthorDate: Wed Oct 27 11:12:13 2021 +0200
NIFI-9327: Added timewindow query to QueryNiFiReportingTask and
MetricsEventReportingTask
---
.../reporting/sql/MetricsEventReportingTask.java | 29 +-
.../nifi/reporting/sql/QueryNiFiReportingTask.java | 23 +-
.../apache/nifi/reporting/sql/QueryTimeAware.java | 51 ++++
.../nifi/reporting/sql/util/TrackedQueryTime.java | 35 +++
.../additionalDetails.html | 29 ++
.../additionalDetails.html | 13 +
.../nifi/record/sink/MockRecordSinkService.java | 3 +-
.../sql/TestMetricsEventReportingTask.java | 304 ++++++++++++++++-----
.../reporting/sql/TestQueryNiFiReportingTask.java | 276 ++++++++++++++++---
.../rules/MockPropertyContextActionHandler.java | 27 +-
.../nifi/rules/engine/MockRulesEngineService.java | 13 +-
11 files changed, 648 insertions(+), 155 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
index 9a930ef..c3a0baa 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.reporting.sql;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
@@ -31,19 +33,23 @@ import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@Tags({"reporting", "rules", "action", "action handler", "status",
"connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
@CapabilityDescription("Triggers rules-driven actions based on metrics values
")
-public class MetricsEventReportingTask extends AbstractReportingTask {
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's
last execution time so that on restart the task knows where it left off.")
+public class MetricsEventReportingTask extends AbstractReportingTask
implements QueryTimeAware {
private List<PropertyDescriptor> properties;
private MetricsQueryService metricsQueryService;
@@ -67,7 +73,7 @@ public class MetricsEventReportingTask extends
AbstractReportingTask {
}
@OnScheduled
- public void setup(final ConfigurationContext context) throws IOException {
+ public void setup(final ConfigurationContext context) {
actionHandler =
context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
rulesEngineService =
context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
final Integer defaultPrecision =
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@@ -77,17 +83,20 @@ public class MetricsEventReportingTask extends
AbstractReportingTask {
@Override
public void onTrigger(ReportingContext context) {
+ String sql =
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
try {
- final String query =
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
- fireRules(context, actionHandler, rulesEngineService, query);
+ sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME,
BULLETIN_END_TIME);
+ sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME,
PROVENANCE_END_TIME);
+
+ fireRules(context, actionHandler, rulesEngineService, sql);
} catch (Exception e) {
getLogger().error("Error opening loading rules: {}", new
Object[]{e.getMessage()}, e);
}
}
private void fireRules(ReportingContext context,
PropertyContextActionHandler actionHandler, RulesEngineService engine, String
query) throws Exception {
+ getLogger().debug("Executing query: {}", query);
QueryResult queryResult = metricsQueryService.query(context, query);
- getLogger().debug("Executing query: {}", new Object[]{ query });
ResultSetRecordSet recordSet =
metricsQueryService.getResultSetRecordSet(queryResult);
Record record;
try {
@@ -97,16 +106,14 @@ public class MetricsEventReportingTask extends
AbstractReportingTask {
facts.put(fieldName, record.getValue(fieldName));
}
List<Action> actions = engine.fireRules(facts);
- if(actions == null || actions.isEmpty()){
+ if (actions == null || actions.isEmpty()) {
getLogger().debug("No actions required for provided
facts.");
} else {
- actions.forEach(action -> {
- actionHandler.execute(context, action,facts);
- });
+ actions.forEach(action -> actionHandler.execute(context,
action, facts));
}
}
} finally {
metricsQueryService.closeQuietly(recordSet);
}
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index 367db30..61edaa2 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.reporting.sql;
+import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask;
@@ -29,7 +31,6 @@ import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +39,10 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
+import static
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
@@ -46,7 +51,8 @@ import static
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFA
+ "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS,
CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions
or capabilities provided by Apache Calcite. Note that the "
+ "CONNECTION_STATUS_PREDICTIONS table is not available for querying
if analytics are not enabled (see the nifi.analytics.predict.enabled property
in nifi.properties). Attempting a "
+ "query on the table when the capability is disabled will cause an
error.")
-public class QueryNiFiReportingTask extends AbstractReportingTask {
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's
last execution time so that on restart the task knows where it left off.")
+public class QueryNiFiReportingTask extends AbstractReportingTask implements
QueryTimeAware {
private List<PropertyDescriptor> properties;
@@ -71,7 +77,7 @@ public class QueryNiFiReportingTask extends
AbstractReportingTask {
}
@OnScheduled
- public void setup(final ConfigurationContext context) throws IOException {
+ public void setup(final ConfigurationContext context) {
recordSinkService =
context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset();
final Integer defaultPrecision =
context.getProperty(VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
@@ -82,13 +88,16 @@ public class QueryNiFiReportingTask extends
AbstractReportingTask {
@Override
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
+ String sql = context.getProperty(QueryMetricsUtil.QUERY).getValue();
try {
- final String sql =
context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
+ sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME,
BULLETIN_END_TIME);
+ sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME,
PROVENANCE_END_TIME);
+
+ getLogger().debug("Executing query: {}", sql);
final QueryResult queryResult = metricsQueryService.query(context,
sql);
final ResultSetRecordSet recordSet;
try {
- getLogger().debug("Executing query: {}", new Object[]{sql});
recordSet =
metricsQueryService.getResultSetRecordSet(queryResult);
} catch (final Exception e) {
getLogger().error("Error creating record set from query
results due to {}", new Object[]{e.getMessage()}, e);
@@ -110,10 +119,10 @@ public class QueryNiFiReportingTask extends
AbstractReportingTask {
metricsQueryService.closeQuietly(queryResult);
}
final long elapsedMillis =
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
- getLogger().debug("Successfully queried and sent in {} millis",
new Object[]{elapsedMillis});
+ getLogger().debug("Successfully queried and sent in {} millis",
elapsedMillis);
} catch (Exception e) {
getLogger().error("Error processing the query due to {}", new
Object[]{e.getMessage()}, e);
}
}
-
}
+
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
new file mode 100644
index 0000000..7d2f0b2
--- /dev/null
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryTimeAware.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+public interface QueryTimeAware {
+
+ default String processStartAndEndTimes(ReportingContext context, String
sql, TrackedQueryTime queryStartTime, TrackedQueryTime queryEndTime) throws
IOException {
+ StateManager stateManager = context.getStateManager();
+ final Map<String, String> stateMap = new
HashMap<>(stateManager.getState(Scope.LOCAL).toMap());
+
+ if (sql.contains(queryStartTime.getSqlPlaceholder()) &&
sql.contains(queryEndTime.getSqlPlaceholder())) {
+ final long startTime = stateMap.get(queryStartTime.name()) == null
? 0 : Long.parseLong(stateMap.get(queryStartTime.name()));
+ final long currentTime = getCurrentTime();
+
+ sql = sql.replace(queryStartTime.getSqlPlaceholder(),
String.valueOf(startTime));
+ sql = sql.replace(queryEndTime.getSqlPlaceholder(),
String.valueOf(currentTime));
+
+ stateMap.put(queryStartTime.name(), String.valueOf(currentTime));
+ stateManager.setState(stateMap, Scope.LOCAL);
+ }
+ return sql;
+ }
+
+ default long getCurrentTime() {
+ return Instant.now().toEpochMilli();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
new file mode 100644
index 0000000..cf46a29
--- /dev/null
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.util;
+
+public enum TrackedQueryTime {
+
+ BULLETIN_START_TIME("$bulletinStartTime"),
+ BULLETIN_END_TIME("$bulletinEndTime"),
+ PROVENANCE_START_TIME("$provenanceStartTime"),
+ PROVENANCE_END_TIME("$provenanceEndTime");
+
+ private final String sqlPlaceholder;
+
+ TrackedQueryTime(final String sqlPlaceholder) {
+ this.sqlPlaceholder = sqlPlaceholder;
+ }
+
+ public String getSqlPlaceholder() {
+ return sqlPlaceholder;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
index 2392aab..f29975f 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
@@ -29,6 +29,35 @@
A distinct ActionHandler can be used to service all events or an
ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally
action handler should be associated with the expected action types
returned from the rules engine.
</p>
+<p>
+ The reporting task can uniquely handle items from the bulletin and
provenance repositories. This means that an item will only be processed once
when the query is set to unique.
+ The query can be set to unique by defining a time window with special sql
placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime,
$provenanceEndTime)
+ that the reporting task will evaluate runtime. See the SQL Query Examples
section.
+</p>
+<br/><br/>
+<h2>SQL Query Examples</h2>
+<p>
+ <b>Example:</b> Select all fields from the <code>CONNECTION_STATUS</code>
table:<br/>
+<pre>SELECT * FROM CONNECTION_STATUS</pre>
+</p>
+<br/>
+<p>
+ <b>Example:</b> Select connection IDs where time-to-backpressure (based on
queue count) is less than 5 minutes:<br/>
+<pre>SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE
predictedTimeToCountBackpressureMillis < 300000</pre>
+</p>
+<br/>
+<p>
+ <b>Example:</b> Get the unique bulletin categories associated with
errors:<br/>
+<pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel =
"ERROR"</pre>
+</p>
+<p>
+ <b>Example:</b> Select all fields from the <code>BULLETINS</code> table
with time window:<br/>
+<pre>SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND
bulletinTimestamp <= $bulletinEndTime</pre>
+</p>
+<p>
+ <b>Example:</b> Select all fields from the <code>PROVENANCE</code> table
with time window:<br/>
+<pre>SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime</pre>
+</p>
<br/>
</body>
</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
index 33baa0a..346834c 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.QueryNiFiReportingTask/additionalDetails.html
@@ -35,6 +35,11 @@
SiteToSiteReportingRecordSink (for sending via the Site-to-Site protocol)
or DatabaseRecordSink (for sending the
query result rows to an relational database).
</p>
+<p>
+ The reporting task can uniquely handle items from the bulletin and
provenance repositories. This means that an item will only be processed once
when the query is set to unique.
+ The query can be set to unique by defining a time window with special sql
placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime,
$provenanceEndTime)
+ that the reporting task will evaluate runtime. See the SQL Query Examples
section.
+</p>
<br/>
<h2>Table Definitions</h2>
<p>
@@ -221,6 +226,14 @@
<b>Example:</b> Get the unique bulletin categories associated with
errors:<br/>
<pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel =
"ERROR"</pre>
</p>
+<p>
+ <b>Example:</b> Select all fields from the <code>BULLETINS</code> table
with time window:<br/>
+<pre>SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND
bulletinTimestamp <= $bulletinEndTime</pre>
+</p>
+<p>
+ <b>Example:</b> Select all fields from the <code>PROVENANCE</code> table
with time window:<br/>
+<pre>SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime</pre>
+</p>
<br/>
</body>
</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
index de2c799..a0f0155 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/record/sink/MockRecordSinkService.java
@@ -33,10 +33,11 @@ import java.util.Map;
public class MockRecordSinkService extends AbstractConfigurableComponent
implements RecordSinkService {
- private List<Map<String, Object>> rows = new ArrayList<>();;
+ private List<Map<String, Object>> rows = new ArrayList<>();
@Override
public WriteResult sendData(RecordSet recordSet, Map<String,String>
attributes, boolean sendZeroResults) throws IOException {
+ rows = new ArrayList<>();
int numRecordsWritten = 0;
RecordSchema recordSchema = recordSet.getSchema();
Record record;
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
index 6934005..196871b 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
@@ -16,10 +16,10 @@
*/
package org.apache.nifi.reporting.sql;
-import com.google.common.collect.Lists;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
@@ -27,21 +27,30 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinFactory;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
-import org.apache.nifi.reporting.Severity;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
-import org.apache.nifi.rules.Action;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
import org.apache.nifi.rules.MockPropertyContextActionHandler;
import org.apache.nifi.rules.PropertyContextActionHandler;
import org.apache.nifi.rules.engine.MockRulesEngineService;
import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockBulletinRepository;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.MockPropertyValue;
-import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.db.JdbcProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -49,27 +58,34 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+
+class TestMetricsEventReportingTask {
-public class TestMetricsEventReportingTask {
private ReportingContext context;
private MockMetricsEventReportingTask reportingTask;
private MockPropertyContextActionHandler actionHandler;
- private MockRulesEngineService rulesEngineService;
private ProcessGroupStatus status;
+ private MockQueryBulletinRepository mockBulletinRepository;
+ private MockProvenanceRepository mockProvenanceRepository;
+ private AtomicLong currentTime;
+ private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
+ currentTime = new AtomicLong();
status = new ProcessGroupStatus();
actionHandler = new MockPropertyContextActionHandler();
status.setId("1234");
@@ -112,63 +128,153 @@ public class TestMetricsEventReportingTask {
rootConnectionStatuses.add(root1ConnectionStatus);
rootConnectionStatuses.add(root2ConnectionStatus);
status.setConnectionStatus(rootConnectionStatuses);
-
- // create a group status with processing time
- ProcessGroupStatus groupStatus1 = new ProcessGroupStatus();
- groupStatus1.setProcessorStatus(processorStatuses);
- groupStatus1.setBytesRead(1234L);
-
- // Create a nested group status with a connection
- ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
- groupStatus2.setProcessorStatus(processorStatuses);
- groupStatus2.setBytesRead(12345L);
- ConnectionStatus nestedConnectionStatus = new ConnectionStatus();
- nestedConnectionStatus.setId("nested");
- nestedConnectionStatus.setQueuedCount(1001);
- Collection<ConnectionStatus> nestedConnectionStatuses = new
ArrayList<>();
- nestedConnectionStatuses.add(nestedConnectionStatus);
- groupStatus2.setConnectionStatus(nestedConnectionStatuses);
- Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
- nestedGroupStatuses.add(groupStatus2);
- groupStatus1.setProcessGroupStatus(nestedGroupStatuses);
-
- ProcessGroupStatus groupStatus3 = new ProcessGroupStatus();
- groupStatus3.setBytesRead(1L);
- ConnectionStatus nestedConnectionStatus2 = new ConnectionStatus();
- nestedConnectionStatus2.setId("nested2");
- nestedConnectionStatus2.setQueuedCount(3);
- Collection<ConnectionStatus> nestedConnectionStatuses2 = new
ArrayList<>();
- nestedConnectionStatuses2.add(nestedConnectionStatus2);
- groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
- Collection<ProcessGroupStatus> nestedGroupStatuses2 = new
ArrayList<>();
- nestedGroupStatuses2.add(groupStatus3);
-
- Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
- groupStatuses.add(groupStatus1);
- groupStatuses.add(groupStatus3);
- status.setProcessGroupStatus(groupStatuses);
}
@Test
- public void testConnectionStatusTable() throws IOException,
InitializationException {
+ void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.QUERY, "select connectionId,
predictedQueuedCount, predictedTimeToBytesBackpressureMillis from
CONNECTION_STATUS_PREDICTIONS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
- List<Map<String,Object>> metricsList = actionHandler.getRows();
- List<Tuple<String, Action>> defaultLogActions =
actionHandler.getDefaultActionsByType("LOG");
- List<Tuple<String, Action>> defaultAlertActions =
actionHandler.getDefaultActionsByType("ALERT");
List<PropertyContext> propertyContexts =
actionHandler.getPropertyContexts();
- assertFalse(metricsList.isEmpty());
- assertEquals(2,defaultLogActions.size());
- assertEquals(2,defaultAlertActions.size());
- assertEquals(4,propertyContexts.size());
+ assertEquals(2, actionHandler.getRows().size());
+ assertEquals(2, propertyContexts.size());
+ }
+
+ @Test
+ void testUniqueBulletinQueryIsInTimeWindow() throws
InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <=
$bulletinEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+ assertEquals(1, actionHandler.getRows().size());
+
+ actionHandler.reset();
+ final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"WARN", "test bulletin 2", "testFlowFileUuid");
+ mockBulletinRepository.addBulletin(bulletin);
+ currentTime.set(bulletin.getTimestamp().getTime());
+ reportingTask.onTrigger(context);
+ assertEquals(1, actionHandler.getRows().size());
+ }
+
+ @Test
+ void testUniqueBulletinQueryIsOutOfTimeWindow() throws
InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, "select bulletinCategory from
BULLETINS where bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <=
$bulletinEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+ assertEquals(1, actionHandler.getRows().size());
+
+ actionHandler.reset();
+ final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"WARN", "test bulletin 2", "testFlowFileUuid");
+ mockBulletinRepository.addBulletin(bulletin);
+ currentTime.set(bulletin.getTimestamp().getTime() - 1);
+ reportingTask.onTrigger(context);
+ assertEquals(0, actionHandler.getRows().size());
+ }
+
+ @Test
+ void testUniqueProvenanceQueryIsInTimeWindow() throws
InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, "select componentId from
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+ assertEquals(1, actionHandler.getRows().size());
+
+ actionHandler.reset();
+
+ MockFlowFile mockFlowFile = new MockFlowFile(2L);
+ ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
+ .setEventType(ProvenanceEventType.CREATE)
+ .fromFlowFile(mockFlowFile)
+ .setComponentId("2")
+ .setComponentType("ReportingTask")
+ .setFlowFileUUID("I am FlowFile 2")
+ .setEventTime(Instant.now().toEpochMilli())
+ .setEventDuration(100)
+ .setTransitUri("test://")
+ .setSourceSystemFlowFileIdentifier("I am FlowFile 2")
+ .setAlternateIdentifierUri("remote://test")
+ .build();
+ mockProvenanceRepository.registerEvent(prov2);
+
+ currentTime.set(prov2.getEventTime());
+ reportingTask.onTrigger(context);
+
+ assertEquals(1, actionHandler.getRows().size());
+ }
+
+ @Test
+ void testUniqueProvenanceQueryIsOutOfTimeWindow() throws
InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, "select componentId from
PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+ assertEquals(1, actionHandler.getRows().size());
+
+ actionHandler.reset();
+
+ MockFlowFile mockFlowFile = new MockFlowFile(2L);
+ ProvenanceEventRecord prov2 = mockProvenanceRepository.eventBuilder()
+ .setEventType(ProvenanceEventType.CREATE)
+ .fromFlowFile(mockFlowFile)
+ .setComponentId("2")
+ .setComponentType("ReportingTask")
+ .setFlowFileUUID("I am FlowFile 2")
+ .setEventTime(Instant.now().toEpochMilli())
+ .setEventDuration(100)
+ .setTransitUri("test://")
+ .setSourceSystemFlowFileIdentifier("I am FlowFile 2")
+ .setAlternateIdentifierUri("remote://test")
+ .build();
+ mockProvenanceRepository.registerEvent(prov2);
+
+ currentTime.set(prov2.getEventTime() - 1);
+ reportingTask.onTrigger(context);
+
+ assertEquals(0, actionHandler.getRows().size());
}
- private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException, IOException {
+ @Test
+ void testTimeWindowFromStateMap() throws IOException,
InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
+ "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp
<= $bulletinEndTime " +
+ "and timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime");
+ reportingTask = initTask(properties);
+
+ long testBulletinStartTime = 1609538145L;
+ long testProvenanceStartTime = 1641074145L;
+ final Map<String, String> stateMap = new HashMap<>();
+ stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(),
String.valueOf(testBulletinStartTime));
+ stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(),
String.valueOf(testProvenanceStartTime));
+ mockStateManager.setState(stateMap, Scope.LOCAL);
+
+ final long bulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+ final long provenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+ assertEquals(testBulletinStartTime, bulletinStartTime);
+ assertEquals(testProvenanceStartTime, provenanceStartTime);
+
+ final long currentTime = Instant.now().toEpochMilli();
+ this.currentTime.set(currentTime);
+
+ reportingTask.onTrigger(context);
+
+ final long updatedBulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+ final long updatedProvenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+
+ assertEquals(currentTime, updatedBulletinStartTime);
+ assertEquals(currentTime, updatedProvenanceStartTime);
+ }
+
+ private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException {
final ComponentLog logger = Mockito.mock(ComponentLog.class);
- final BulletinRepository bulletinRepository =
Mockito.mock(BulletinRepository.class);
reportingTask = new MockMetricsEventReportingTask();
final ReportingInitializationContext initContext =
Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
@@ -183,9 +289,8 @@ public class TestMetricsEventReportingTask {
context = Mockito.mock(ReportingContext.class);
Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
- Mockito.when(context.getStateManager()).thenReturn(new
MockStateManager(reportingTask));
-
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
- Mockito.when(context.createBulletin(anyString(),any(Severity.class),
anyString())).thenReturn(null);
+ mockStateManager = new MockStateManager(reportingTask);
+ Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0,
PropertyDescriptor.class);
@@ -200,13 +305,8 @@ public class TestMetricsEventReportingTask {
actionHandler = new MockPropertyContextActionHandler();
Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
- Action action1 = new Action();
- action1.setType("LOG");
- Action action2 = new Action();
- action2.setType("ALERT");
-
final PropertyValue resValue =
Mockito.mock(StandardPropertyValue.class);
- rulesEngineService = new
MockRulesEngineService(Lists.newArrayList(action1,action2));
+ MockRulesEngineService rulesEngineService = new
MockRulesEngineService();
Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
ConfigurationContext configContext =
Mockito.mock(ConfigurationContext.class);
@@ -216,10 +316,82 @@ public class TestMetricsEventReportingTask {
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
MockPropertyValue("0"));
reportingTask.setup(configContext);
+ setupMockProvenanceRepository(eventAccess);
+ setupMockBulletinRepository();
+
return reportingTask;
}
- private static final class MockMetricsEventReportingTask extends
MetricsEventReportingTask {
+ private final class MockMetricsEventReportingTask extends
MetricsEventReportingTask {
+ @Override
+ public long getCurrentTime() {
+ return currentTime.get();
+ }
+ }
+
+ private void setupMockBulletinRepository() {
+ mockBulletinRepository = new MockQueryBulletinRepository();
+
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
"WARN", "test bulletin 1", "testFlowFileUuid"));
+
+
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
+ }
+
+ private void setupMockProvenanceRepository(final EventAccess eventAccess) {
+
+ mockProvenanceRepository = new MockProvenanceRepository();
+ long currentTimeMillis = System.currentTimeMillis();
+ Map<String, String> previousAttributes = new HashMap<>();
+ previousAttributes.put("mime.type", "application/json");
+ previousAttributes.put("test.value", "A");
+ Map<String, String> updatedAttributes = new
HashMap<>(previousAttributes);
+ updatedAttributes.put("test.value", "B");
+
+ // Generate provenance events and put them in a repository
+ Processor processor = mock(Processor.class);
+ SharedSessionState sharedState = new SharedSessionState(processor, new
AtomicLong(0));
+ MockProcessSession processSession = new
MockProcessSession(sharedState, processor);
+ MockFlowFile mockFlowFile = processSession.createFlowFile("Test
content".getBytes());
+
+ ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
+ .setEventType(ProvenanceEventType.CREATE)
+ .fromFlowFile(mockFlowFile)
+ .setComponentId("1")
+ .setComponentType("ReportingTask")
+ .setFlowFileUUID("I am FlowFile 1")
+ .setEventTime(currentTimeMillis)
+ .setEventDuration(100)
+ .setTransitUri("test://")
+ .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+ .setAlternateIdentifierUri("remote://test")
+ .setAttributes(previousAttributes, updatedAttributes)
+ .build();
+ mockProvenanceRepository.registerEvent(prov1);
+
+
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
+ }
+
+ private static class MockQueryBulletinRepository extends
MockBulletinRepository {
+ Map<String, List<Bulletin>> bulletins = new HashMap<>();
+
+ @Override
+ public void addBulletin(Bulletin bulletin) {
+ bulletins.computeIfAbsent(bulletin.getCategory(), key -> new
ArrayList<>())
+ .add(bulletin);
+ }
+
+ @Override
+ public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
+ return new ArrayList<>(
+
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
+ .orElse(Collections.emptyList())
+ );
+ }
+
+ @Override
+ public List<Bulletin> findBulletinsForController() {
+ return Optional.ofNullable(bulletins.get("controller"))
+ .orElse(Collections.emptyList());
+ }
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index 87d42fd..a031b1a 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -20,6 +20,7 @@ package org.apache.nifi.reporting.sql;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -34,12 +35,14 @@ import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinFactory;
import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
+import org.apache.nifi.reporting.sql.util.TrackedQueryTime;
import org.apache.nifi.reporting.util.metrics.MetricNames;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockBulletinRepository;
@@ -54,12 +57,14 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -67,21 +72,24 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-public class TestQueryNiFiReportingTask {
+class TestQueryNiFiReportingTask {
private ReportingContext context;
private MockQueryNiFiReportingTask reportingTask;
private MockRecordSinkService mockRecordSinkService;
private ProcessGroupStatus status;
+ private BulletinRepository mockBulletinRepository;
+ private MockProvenanceRepository mockProvenanceRepository;
+ private AtomicLong currentTime;
+ private MockStateManager mockStateManager;
@BeforeEach
public void setup() {
- mockRecordSinkService = new MockRecordSinkService();
+ currentTime = new AtomicLong();
status = new ProcessGroupStatus();
status.setId("1234");
status.setFlowFilesReceived(5);
@@ -145,8 +153,6 @@ public class TestQueryNiFiReportingTask {
Collection<ConnectionStatus> nestedConnectionStatuses2 = new
ArrayList<>();
nestedConnectionStatuses2.add(nestedConnectionStatus2);
groupStatus3.setConnectionStatus(nestedConnectionStatuses2);
- Collection<ProcessGroupStatus> nestedGroupStatuses2 = new
ArrayList<>();
- nestedGroupStatuses2.add(groupStatus3);
Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
groupStatuses.add(groupStatus1);
@@ -156,7 +162,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
- public void testConnectionStatusTable() throws IOException,
InitializationException {
+ void testConnectionStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select
id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by
queuedCount desc");
@@ -193,7 +199,191 @@ public class TestQueryNiFiReportingTask {
}
@Test
- public void testJvmMetricsTable() throws IOException,
InitializationException {
+ void testBulletinIsInTimeWindow() throws InitializationException {
+ String query = "select * from BULLETINS where bulletinTimestamp >
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
+
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, query);
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+ assertEquals(3, rows.size());
+
+ final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(),
"ERROR", "test bulletin 3", "testFlowFileUuid");
+ mockBulletinRepository.addBulletin(bulletin);
+ currentTime.set(bulletin.getTimestamp().getTime());
+
+ reportingTask.onTrigger(context);
+
+
+ List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+ assertEquals(1, sameRows.size());
+ }
+
+ @Test
+ void testBulletinIsOutOfTimeWindow() throws InitializationException {
+ String query = "select * from BULLETINS where bulletinTimestamp >
$bulletinStartTime and bulletinTimestamp <= $bulletinEndTime";
+
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, query);
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+ assertEquals(3, rows.size());
+
+ final Bulletin bulletin = BulletinFactory.createBulletin("input port",
"ERROR", "test bulletin 3", "testFlowFileUuid");
+ mockBulletinRepository.addBulletin(bulletin);
+ currentTime.set(bulletin.getTimestamp().getTime() - 1);
+
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+ assertEquals(0, sameRows.size());
+ }
+
+ @Test
+ void testProvenanceEventIsInTimeWindow() throws InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where
timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+ assertEquals(1001, rows.size());
+
+ MockFlowFile mockFlowFile = new MockFlowFile(1002L);
+ ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
+ .setEventType(ProvenanceEventType.CREATE)
+ .fromFlowFile(mockFlowFile)
+ .setComponentId("12345")
+ .setComponentType("ReportingTask")
+ .setFlowFileUUID("I am FlowFile 1")
+ .setEventTime(Instant.now().toEpochMilli())
+ .setEventDuration(100)
+ .setTransitUri("test://")
+ .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+ .setAlternateIdentifierUri("remote://test")
+ .build();
+
+ mockProvenanceRepository.registerEvent(prov1002);
+
+ currentTime.set(prov1002.getEventTime());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+ assertEquals(1, sameRows.size());
+ }
+
+ @Test
+ void testProvenanceEventIsOutOfTimeWindow() throws InitializationException
{
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE where
timestampMillis > $provenanceStartTime and timestampMillis <=
$provenanceEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+ assertEquals(1001, rows.size());
+
+ MockFlowFile mockFlowFile = new MockFlowFile(1002L);
+ ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
+ .setEventType(ProvenanceEventType.CREATE)
+ .fromFlowFile(mockFlowFile)
+ .setComponentId("12345")
+ .setComponentType("ReportingTask")
+ .setFlowFileUUID("I am FlowFile 1")
+ .setEventTime(Instant.now().toEpochMilli())
+ .setEventDuration(100)
+ .setTransitUri("test://")
+ .setSourceSystemFlowFileIdentifier("I am FlowFile 1")
+ .setAlternateIdentifierUri("remote://test")
+ .build();
+
+ mockProvenanceRepository.registerEvent(prov1002);
+
+ currentTime.set(prov1002.getEventTime() - 1);
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+ assertEquals(0, sameRows.size());
+ }
+
+ @Test
+ void testUniqueProvenanceAndBulletinQuery() throws InitializationException
{
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
+ "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp
<= $bulletinEndTime " +
+ "and timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime");
+ reportingTask = initTask(properties);
+ currentTime.set(Instant.now().toEpochMilli());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+ assertEquals(3003, rows.size());
+
+ final Bulletin bulletin =
BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(),
"ERROR", "test bulletin 3", "testFlowFileUuid");
+ mockBulletinRepository.addBulletin(bulletin);
+
+ MockFlowFile mockFlowFile = new MockFlowFile(1002L);
+ ProvenanceEventRecord prov1002 =
mockProvenanceRepository.eventBuilder()
+ .setEventType(ProvenanceEventType.CREATE)
+ .fromFlowFile(mockFlowFile)
+ .setComponentId("12345")
+ .setComponentType("ReportingTask")
+ .setFlowFileUUID("I am FlowFile 1")
+ .build();
+
+ mockProvenanceRepository.registerEvent(prov1002);
+
+ currentTime.set(bulletin.getTimestamp().getTime());
+ reportingTask.onTrigger(context);
+
+ List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
+ assertEquals(1, sameRows.size());
+ }
+
+ @Test
+ void testTimeWindowFromStateMap() throws IOException,
InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS,
PROVENANCE where " +
+ "bulletinTimestamp > $bulletinStartTime and bulletinTimestamp
<= $bulletinEndTime " +
+ "and timestampMillis > $provenanceStartTime and
timestampMillis <= $provenanceEndTime");
+ reportingTask = initTask(properties);
+
+ long testBulletinStartTime = 1609538145L;
+ long testProvenanceStartTime = 1641074145L;
+ final Map<String, String> stateMap = new HashMap<>();
+ stateMap.put(TrackedQueryTime.BULLETIN_START_TIME.name(),
String.valueOf(testBulletinStartTime));
+ stateMap.put(TrackedQueryTime.PROVENANCE_START_TIME.name(),
String.valueOf(testProvenanceStartTime));
+ mockStateManager.setState(stateMap, Scope.LOCAL);
+
+ final long bulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+ final long provenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+
+ assertEquals(testBulletinStartTime, bulletinStartTime);
+ assertEquals(testProvenanceStartTime, provenanceStartTime);
+
+ final long currentTime = Instant.now().toEpochMilli();
+ this.currentTime.set(currentTime);
+
+ reportingTask.onTrigger(context);
+
+ final long updatedBulletinStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.BULLETIN_START_TIME.name()));
+ final long updatedProvenanceStartTime =
Long.parseLong(context.getStateManager().getState(Scope.LOCAL).get(TrackedQueryTime.PROVENANCE_START_TIME.name()));
+
+ assertEquals(currentTime, updatedBulletinStartTime);
+ assertEquals(currentTime, updatedProvenanceStartTime);
+ }
+
+ @Test
+ void testJvmMetricsTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select "
@@ -222,7 +412,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
- public void testProcessGroupStatusTable() throws IOException,
InitializationException {
+ void testProcessGroupStatusTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from
PROCESS_GROUP_STATUS order by bytesRead asc");
@@ -247,7 +437,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
- public void testNoResults() throws IOException, InitializationException {
+ void testNoResults() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from
CONNECTION_STATUS where queuedCount > 2000");
@@ -259,7 +449,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
- public void testProvenanceTable() throws IOException,
InitializationException {
+ void testProvenanceTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from PROVENANCE order
by eventId asc");
@@ -304,7 +494,7 @@ public class TestQueryNiFiReportingTask {
}
@Test
- public void testBulletinTable() throws IOException,
InitializationException {
+ void testBulletinTable() throws InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS order
by bulletinTimestamp asc");
@@ -329,12 +519,12 @@ public class TestQueryNiFiReportingTask {
// Validate the third row
row = rows.get(2);
- assertEquals("controller service", row.get("bulletinCategory"));
+ assertEquals("controller_service", row.get("bulletinCategory"));
assertEquals("ERROR", row.get("bulletinLevel"));
assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
}
- private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException, IOException {
+ private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
reportingTask = new MockQueryNiFiReportingTask();
@@ -342,6 +532,7 @@ public class TestQueryNiFiReportingTask {
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
+
Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor :
reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
@@ -349,7 +540,10 @@ public class TestQueryNiFiReportingTask {
properties.putAll(customProperties);
context = mock(ReportingContext.class);
- Mockito.when(context.getStateManager()).thenReturn(new
MockStateManager(reportingTask));
+
+ mockStateManager = new MockStateManager(reportingTask);
+
+ Mockito.when(context.getStateManager()).thenReturn(mockStateManager);
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0,
PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
@@ -371,7 +565,7 @@ public class TestQueryNiFiReportingTask {
Mockito.when(configContext.getProperty(JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE)).thenReturn(new
MockPropertyValue("0"));
reportingTask.setup(configContext);
- MockProvenanceRepository provenanceRepository = new
MockProvenanceRepository();
+ mockProvenanceRepository = new MockProvenanceRepository();
long currentTimeMillis = System.currentTimeMillis();
Map<String, String> previousAttributes = new HashMap<>();
previousAttributes.put("mime.type", "application/json");
@@ -385,7 +579,7 @@ public class TestQueryNiFiReportingTask {
MockProcessSession processSession = new
MockProcessSession(sharedState, processor);
MockFlowFile mockFlowFile = processSession.createFlowFile("Test
content".getBytes());
- ProvenanceEventRecord prov1 = provenanceRepository.eventBuilder()
+ ProvenanceEventRecord prov1 = mockProvenanceRepository.eventBuilder()
.setEventType(ProvenanceEventType.CREATE)
.fromFlowFile(mockFlowFile)
.setComponentId("12345")
@@ -399,12 +593,12 @@ public class TestQueryNiFiReportingTask {
.setAttributes(previousAttributes, updatedAttributes)
.build();
- provenanceRepository.registerEvent(prov1);
+ mockProvenanceRepository.registerEvent(prov1);
for (int i = 1; i < 1001; i++) {
String indexString = Integer.toString(i);
mockFlowFile = processSession.createFlowFile(("Test content " +
indexString).getBytes());
- ProvenanceEventRecord prov = provenanceRepository.eventBuilder()
+ ProvenanceEventRecord prov =
mockProvenanceRepository.eventBuilder()
.fromFlowFile(mockFlowFile)
.setEventType(ProvenanceEventType.DROP)
.setComponentId(indexString)
@@ -412,52 +606,48 @@ public class TestQueryNiFiReportingTask {
.setFlowFileUUID("I am FlowFile " + indexString)
.setEventTime(currentTimeMillis - i)
.build();
- provenanceRepository.registerEvent(prov);
+ mockProvenanceRepository.registerEvent(prov);
}
-
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(mockProvenanceRepository);
+
+ mockBulletinRepository = new MockQueryBulletinRepository();
+
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin("controller",
"WARN", "test bulletin 2", "testFlowFileUuid"));
+
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.PROCESSOR.name().toLowerCase(),
"INFO", "test bulletin 1", "testFlowFileUuid"));
+
mockBulletinRepository.addBulletin(BulletinFactory.createBulletin(ComponentType.CONTROLLER_SERVICE.name().toLowerCase(),
"ERROR", "test bulletin 2", "testFlowFileUuid"));
- MockBulletinRepository bulletinRepository = new
MockQueryBulletinRepository();
-
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller",
"WARN", "test bulletin 2", "testFlowFileUuid"));
-
bulletinRepository.addBulletin(BulletinFactory.createBulletin("processor",
"INFO", "test bulletin 1", "testFlowFileUuid"));
-
bulletinRepository.addBulletin(BulletinFactory.createBulletin("controller
service", "ERROR", "test bulletin 2", "testFlowFileUuid"));
-
Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
+
Mockito.when(context.getBulletinRepository()).thenReturn(mockBulletinRepository);
return reportingTask;
}
- private static final class MockQueryNiFiReportingTask extends
QueryNiFiReportingTask {
+ private final class MockQueryNiFiReportingTask extends
QueryNiFiReportingTask {
+ @Override
+ public long getCurrentTime() {
+ return currentTime.get();
+ }
}
private static class MockQueryBulletinRepository extends
MockBulletinRepository {
-
- List<Bulletin> bulletinList;
-
-
- public MockQueryBulletinRepository() {
- bulletinList = new ArrayList<>();
- }
+ Map<String, List<Bulletin>> bulletins = new HashMap<>();
@Override
public void addBulletin(Bulletin bulletin) {
- bulletinList.add(bulletin);
+ bulletins.computeIfAbsent(bulletin.getCategory(), __ -> new
ArrayList<>())
+ .add(bulletin);
}
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
- if (bulletinQuery.getSourceType().equals(ComponentType.PROCESSOR))
{
- return Collections.singletonList(bulletinList.get(1));
- } else if
(bulletinQuery.getSourceType().equals(ComponentType.CONTROLLER_SERVICE)) {
- return Collections.singletonList(bulletinList.get(2));
- } else {
- return Collections.emptyList();
- }
+ return new ArrayList<>(
+
Optional.ofNullable(bulletins.get(bulletinQuery.getSourceType().name().toLowerCase()))
+ .orElse(Collections.emptyList()));
}
@Override
public List<Bulletin> findBulletinsForController() {
- return Collections.singletonList(bulletinList.get(0));
+ return Optional.ofNullable(bulletins.get("controller"))
+ .orElse(Collections.emptyList());
}
-
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
index 323317d..dbbb50a 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
@@ -20,18 +20,14 @@ import
org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.Tuple;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-public class MockPropertyContextActionHandler extends
AbstractConfigurableComponent implements PropertyContextActionHandler{
-
- private List<Map<String, Object>> rows = new ArrayList<>();
- private List<Tuple<String,Action>> defaultActions = new ArrayList<>();
- private List<PropertyContext> propertyContexts = new ArrayList<>();
+public class MockPropertyContextActionHandler extends
AbstractConfigurableComponent implements PropertyContextActionHandler {
+ private final List<Map<String, Object>> rows = new ArrayList<>();
+ private final List<PropertyContext> propertyContexts = new ArrayList<>();
@Override
@@ -43,7 +39,6 @@ public class MockPropertyContextActionHandler extends
AbstractConfigurableCompon
@Override
public void execute(Action action, Map<String, Object> facts) {
rows.add(facts);
- defaultActions.add( new Tuple<>(action.getType(),action));
}
@@ -56,15 +51,6 @@ public class MockPropertyContextActionHandler extends
AbstractConfigurableCompon
return rows;
}
- public List<Tuple<String, Action>> getDefaultActions() {
- return defaultActions;
- }
-
- public List<Tuple<String,Action>> getDefaultActionsByType(final String
type){
- return defaultActions.stream().filter(stringActionTuple ->
stringActionTuple
- .getKey().equalsIgnoreCase(type)).collect(Collectors.toList());
- }
-
public List<PropertyContext> getPropertyContexts() {
return propertyContexts;
}
@@ -73,4 +59,9 @@ public class MockPropertyContextActionHandler extends
AbstractConfigurableCompon
public String getIdentifier() {
return "MockPropertyContextActionHandler";
}
-}
+
+ public void reset() {
+ rows.clear();
+ propertyContexts.clear();
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
index e3ccc73..90e8472 100644
---
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
+++
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
@@ -18,26 +18,21 @@ package org.apache.nifi.rules.engine;
import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.rules.Action;
+import org.mockito.Mockito;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MockRulesEngineService extends AbstractConfigurableComponent
implements RulesEngineService {
- private List<Action> actions;
-
- public MockRulesEngineService(List<Action> actions) {
- this.actions = actions;
- }
-
@Override
public List<Action> fireRules(Map<String, Object> facts) {
- return actions;
+ return Collections.singletonList(Mockito.mock(Action.class));
}
@Override
- public void initialize(ControllerServiceInitializationContext context)
throws InitializationException {
+ public void initialize(ControllerServiceInitializationContext context) {
}
@Override