This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 09bc5bcb5a988f08d1c703ca3c4476f3f23aeb61 Author: Matthew Burgess <[email protected]> AuthorDate: Tue Oct 25 15:19:59 2022 -0400 NIFI-10699: Add FLOW_CONFIG_HISTORY table to QueryNiFiReportingTask This closes #6581 Signed-off-by: David Handermann <[email protected]> --- .../nifi-sql-reporting-tasks/pom.xml | 6 + .../nifi/reporting/sql/MetricsSqlQueryService.java | 3 + .../nifi/reporting/sql/QueryNiFiReportingTask.java | 11 +- .../FlowConfigHistoryEnumerator.java | 151 +++++++++++++++ .../FlowConfigHistoryProjectTableScanRule.java | 76 ++++++++ .../flowconfighistory/FlowConfigHistoryTable.java | 211 +++++++++++++++++++++ .../FlowConfigHistoryTableScan.java | 91 +++++++++ .../nifi/reporting/sql/util/TrackedQueryTime.java | 4 +- .../reporting/sql/TestQueryNiFiReportingTask.java | 43 +++++ 9 files changed, 591 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml index 144c31fca6..97c188f707 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml @@ -91,6 +91,12 @@ <version>1.19.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-user-actions</artifactId> + <version>1.19.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-rules-engine-service-api</artifactId> diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java index 588da4aa85..09b3be0f4a 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java @@ -30,6 +30,7 @@ import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.sql.bulletins.BulletinTable; import org.apache.nifi.reporting.sql.connectionstatus.ConnectionStatusTable; import org.apache.nifi.reporting.sql.connectionstatuspredictions.ConnectionStatusPredictionsTable; +import org.apache.nifi.reporting.sql.flowconfighistory.FlowConfigHistoryTable; import org.apache.nifi.reporting.sql.metrics.JvmMetricsTable; import org.apache.nifi.reporting.sql.processgroupstatus.ProcessGroupStatusTable; import org.apache.nifi.reporting.sql.processorstatus.ProcessorStatusTable; @@ -169,6 +170,8 @@ public class MetricsSqlQueryService implements MetricsQueryService { rootSchema.add("BULLETINS", bulletinTable); final ProvenanceTable provenanceTable = new ProvenanceTable(context, getLogger()); rootSchema.add("PROVENANCE", provenanceTable); + final FlowConfigHistoryTable flowConfigHistoryTable = new FlowConfigHistoryTable(context, getLogger()); + rootSchema.add("FLOW_CONFIG_HISTORY", flowConfigHistoryTable); rootSchema.setCacheEnabled(false); diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java index 61edaa2773..3d1b0b1a26 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java @@ -41,16 +41,18 @@ import java.util.concurrent.TimeUnit; import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME; import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME; +import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.FLOW_CONFIG_HISTORY_END_TIME; +import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.FLOW_CONFIG_HISTORY_START_TIME; import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME; import static org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME; import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION; import static org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE; -@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "provenance", "record", "sql"}) +@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "provenance", "record", "sql", "flow", "config"}) @CapabilityDescription("Publishes NiFi status information based on the results of a user-specified SQL query. The query may make use of the CONNECTION_STATUS, PROCESSOR_STATUS, " - + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite. Note that the " - + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property in nifi.properties). Attempting a " - + "query on the table when the capability is disabled will cause an error.") + + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, FLOW_CONFIG_HISTORY, or PROVENANCE tables, and can use any functions or capabilities provided by " + + "Apache Calcite. Note that the CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled (see the nifi.analytics.predict.enabled property " + + "in nifi.properties). Attempting a query on the table when the capability is disabled will cause an error.") @Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last execution time so that on restart the task knows where it left off.") public class QueryNiFiReportingTask extends AbstractReportingTask implements QueryTimeAware { @@ -92,6 +94,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask implements Que try { sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, BULLETIN_END_TIME); sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, PROVENANCE_END_TIME); + sql = processStartAndEndTimes(context, sql, FLOW_CONFIG_HISTORY_START_TIME, FLOW_CONFIG_HISTORY_END_TIME); getLogger().debug("Executing query: {}", sql); final QueryResult queryResult = metricsQueryService.query(context, sql); diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryEnumerator.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryEnumerator.java new file mode 100644 index 0000000000..7968a233cc --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryEnumerator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting.sql.flowconfighistory; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.nifi.action.Action; +import org.apache.nifi.action.details.ConfigureDetails; +import org.apache.nifi.action.details.ConnectDetails; +import org.apache.nifi.action.details.MoveDetails; +import org.apache.nifi.action.details.PurgeDetails; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.ReportingContext; + +import java.util.Iterator; +import java.util.List; + +public class FlowConfigHistoryEnumerator implements Enumerator<Object> { + private final ReportingContext context; + private final ComponentLog logger; + private final int[] fields; + + private Iterator<Action> actionIterator; + private Object currentRow; + private int recordsRead = 0; + + public FlowConfigHistoryEnumerator(final ReportingContext context, final ComponentLog logger, final int[] fields) { + this.context = context; + this.logger = logger; + this.fields = fields; + reset(); + } + + @Override + public Object current() { + return currentRow; + } + + @Override + public boolean moveNext() { + currentRow = null; + + if (!actionIterator.hasNext()) { + // If we are out of data, close the InputStream. We do this because + // Calcite does not necessarily call our close() method. + close(); + try { + onFinish(); + } catch (final Exception e) { + logger.error("Failed to perform tasks when enumerator was finished", e); + } + + return false; + } + + final Action action = actionIterator.next(); + currentRow = filterColumns(action); + + recordsRead++; + return true; + } + + protected int getRecordsRead() { + return recordsRead; + } + + protected void onFinish() { + } + + private Object filterColumns(final Action action) { + if (action == null) { + return null; + } + + final boolean isClustered = context.isClustered(); + String nodeId = context.getClusterNodeIdentifier(); + if (nodeId == null && isClustered) { + nodeId = "unknown"; + } + + final Object[] row = new Object[]{ + action.getId(), + action.getTimestamp().getTime(), + action.getUserIdentity(), + action.getSourceId(), + action.getSourceName(), + action.getSourceType(), + action.getOperation().toString(), + action.getActionDetails() instanceof ConfigureDetails ? ((ConfigureDetails) action.getActionDetails()).getName() : null, + action.getActionDetails() instanceof ConfigureDetails ? ((ConfigureDetails) action.getActionDetails()).getPreviousValue() : null, + action.getActionDetails() instanceof ConfigureDetails ? ((ConfigureDetails) action.getActionDetails()).getValue() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getSourceId() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getSourceName() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getSourceType() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getDestinationId() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getDestinationName() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getDestinationType() : null, + action.getActionDetails() instanceof ConnectDetails ? ((ConnectDetails) action.getActionDetails()).getRelationship() : null, + action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getGroup() : null, + action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getGroupId() : null, + action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getPreviousGroup() : null, + action.getActionDetails() instanceof MoveDetails ? ((MoveDetails) action.getActionDetails()).getPreviousGroupId() : null, + action.getActionDetails() instanceof PurgeDetails ? ((PurgeDetails) action.getActionDetails()).getEndDate().getTime() : null + }; + + // If we want no fields just return null + if (fields == null) { + return row; + } + + // If we want only a single field, then Calcite is going to expect us to return + // the actual value, NOT a 1-element array of values. + if (fields.length == 1) { + final int desiredCellIndex = fields[0]; + return row[desiredCellIndex]; + } + + // Create a new Object array that contains only the desired fields. + final Object[] filtered = new Object[fields.length]; + for (int i = 0; i < fields.length; i++) { + final int indexToKeep = fields[i]; + filtered[i] = row[indexToKeep]; + } + + return filtered; + } + + @Override + public void reset() { + List<Action> fullFlowConfigHistoryList = context.getEventAccess().getFlowChanges(0, Short.MAX_VALUE); + actionIterator = fullFlowConfigHistoryList.iterator(); + } + + @Override + public void close() { + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryProjectTableScanRule.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryProjectTableScanRule.java new file mode 100644 index 0000000000..9f33e4f00d --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryProjectTableScanRule.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql.flowconfighistory; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; + +import java.util.List; + +/** + * Planner rule that projects from a {@link FlowConfigHistoryTableScan} scan just the columns + * needed to satisfy a projection. If the projection's expressions are trivial, + * the projection is removed. + */ +public class FlowConfigHistoryProjectTableScanRule extends RelOptRule { + public static final FlowConfigHistoryProjectTableScanRule INSTANCE = new FlowConfigHistoryProjectTableScanRule(); + + private FlowConfigHistoryProjectTableScanRule() { + super( + operand(LogicalProject.class, + operand(FlowConfigHistoryTableScan.class, none())), + "BulletinProjectTableScanRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalProject project = call.rel(0); + final FlowConfigHistoryTableScan scan = call.rel(1); + final int[] fields = getProjectFields(project.getProjects()); + + if (fields == null) { + // Project contains expressions more complex than just field references. + return; + } + + call.transformTo( + new FlowConfigHistoryTableScan( + scan.getCluster(), + scan.getTable(), + scan.flowConfigHistoryTable, + fields)); + } + + private int[] getProjectFields(List<RexNode> exps) { + final int[] fields = new int[exps.size()]; + + for (int i = 0; i < exps.size(); i++) { + final RexNode exp = exps.get(i); + + if (exp instanceof RexInputRef) { + fields[i] = ((RexInputRef) exp).getIndex(); + } else { + return null; // not a simple projection + } + } + + return fields; + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTable.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTable.java new file mode 100644 index 0000000000..c5af557030 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTable.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql.flowconfighistory; + +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.Schema.TableType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.util.Pair; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.ReportingContext; + +import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +public class FlowConfigHistoryTable extends AbstractTable implements QueryableTable, TranslatableTable { + + private final ComponentLog logger; + + private RelDataType relDataType = null; + + private final ReportingContext context; + private volatile int maxRecordsRead; + + private final Set<FlowConfigHistoryEnumerator> enumerators = new HashSet<>(); + + /** + * Creates a Flow Configuration History table. + */ + public FlowConfigHistoryTable(final ReportingContext context, final ComponentLog logger) { + this.context = context; + this.logger = logger; + } + + @Override + public String toString() { + return "FlowConfigHistoryTable"; + } + + public void close() { + synchronized (enumerators) { + for (final FlowConfigHistoryEnumerator enumerator : enumerators) { + enumerator.close(); + } + } + } + + /** + * Returns an enumerable over a given projection of the fields. + * + * <p> + * Called from generated code. + */ + public Enumerable<Object> project(final int[] fields) { + return new AbstractEnumerable<Object>() { + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public Enumerator<Object> enumerator() { + final FlowConfigHistoryEnumerator flowConfigHistoryEnumerator = new FlowConfigHistoryEnumerator(context, logger, fields) { + @Override + protected void onFinish() { + final int recordCount = getRecordsRead(); + if (recordCount > maxRecordsRead) { + maxRecordsRead = recordCount; + } + } + + @Override + public void close() { + synchronized (enumerators) { + enumerators.remove(this); + } + super.close(); + } + }; + + synchronized (enumerators) { + enumerators.add(flowConfigHistoryEnumerator); + } + + return flowConfigHistoryEnumerator; + } + }; + } + + public int getRecordsRead() { + return maxRecordsRead; + } + + @Override + @SuppressWarnings("rawtypes") + public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + @Override + public Type getElementType() { + return Object[].class; + } + + @Override + public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) { + throw new UnsupportedOperationException(); + } + + @Override + public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) { + // Request all fields. + final int fieldCount = relOptTable.getRowType().getFieldCount(); + final int[] fields = new int[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + fields[i] = i; + } + + return new FlowConfigHistoryTableScan(context.getCluster(), relOptTable, this, fields); + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + if (relDataType != null) { + return relDataType; + } + + final List<String> names = Arrays.asList( + "actionId", + "actionTimestamp", + "actionUserIdentity", + "actionSourceId", + "actionSourceName", + "actionSourceType", + "actionOperation", + "configureDetailsName", + "configureDetailsPreviousValue", + "configureDetailsValue", + "connectionSourceId", + "connectionSourceName", + "connectionSourceType", + "connectionDestinationId", + "connectionDestinationName", + "connectionDestinationType", + "connectionRelationship", + "moveGroup", + "moveGroupId", + "movePreviousGroup", + "movePreviousGroupId", + "purgeEndDate" + ); + final List<RelDataType> types = Arrays.asList( + typeFactory.createJavaType(int.class), + typeFactory.createJavaType(long.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(String.class), + typeFactory.createJavaType(long.class) + ); + + relDataType = typeFactory.createStructType(Pair.zip(names, types)); + return relDataType; + } + + @Override + public TableType getJdbcTableType() { + return TableType.TEMPORARY_TABLE; + } +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTableScan.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTableScan.java new file mode 100644 index 0000000000..b5ae01e6f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTableScan.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.sql.flowconfighistory; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; + +import java.util.List; + +/** + * Relational expression representing a query for bulletin information. + * + * <p> + * Like any table scan, it serves as a leaf node of a query tree. + * </p> + */ +public class FlowConfigHistoryTableScan extends TableScan implements EnumerableRel { + final FlowConfigHistoryTable flowConfigHistoryTable; + final int[] fields; + + protected FlowConfigHistoryTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowConfigHistoryTable flowConfigHistoryTable, final int[] fields) { + super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table); + + this.flowConfigHistoryTable = flowConfigHistoryTable; + this.fields = fields; + } + + @Override + public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) { + return new FlowConfigHistoryTableScan(getCluster(), table, flowConfigHistoryTable, fields); + } + + @Override + public RelWriter explainTerms(final RelWriter pw) { + return super.explainTerms(pw).item("fields", Primitive.asList(fields)); + } + + @Override + public RelDataType deriveRowType() { + final List<RelDataTypeField> fieldList = table.getRowType().getFieldList(); + final RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); + for (int field : fields) { + builder.add(fieldList.get(field)); + } + return builder.build(); + } + + @Override + public void register(RelOptPlanner planner) { + planner.addRule(FlowConfigHistoryProjectTableScanRule.INSTANCE); + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray()); + + return implementor.result(physType, Blocks.toBlock( + Expressions.call(table.getExpression(FlowConfigHistoryTable.class), "project", Expressions.constant(fields)))); + } + +} diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java index cf46a291a4..a9c003125e 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java @@ -21,7 +21,9 @@ public enum TrackedQueryTime { BULLETIN_START_TIME("$bulletinStartTime"), BULLETIN_END_TIME("$bulletinEndTime"), PROVENANCE_START_TIME("$provenanceStartTime"), - PROVENANCE_END_TIME("$provenanceEndTime"); + PROVENANCE_END_TIME("$provenanceEndTime"), + FLOW_CONFIG_HISTORY_START_TIME("$flowConfigHistoryStartTime"), + FLOW_CONFIG_HISTORY_END_TIME("$flowConfigHistoryEndTime"); private final String sqlPlaceholder; diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index 286cf07067..6b0f1c36a6 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -17,6 +17,11 @@ package org.apache.nifi.reporting.sql; +import org.apache.nifi.action.Action; +import org.apache.nifi.action.Component; +import org.apache.nifi.action.FlowChangeAction; +import org.apache.nifi.action.Operation; +import org.apache.nifi.action.details.FlowChangeConfigureDetails; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -61,6 +66,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; class TestQueryNiFiReportingTask { @@ -86,6 +93,7 @@ class TestQueryNiFiReportingTask { private MockProvenanceRepository mockProvenanceRepository; private AtomicLong currentTime; private MockStateManager mockStateManager; + private List<Action> flowConfigHistory; @BeforeEach public void setup() { @@ -105,6 +113,7 @@ class TestQueryNiFiReportingTask { // create a processor status with processing time ProcessorStatus procStatus = new ProcessorStatus(); procStatus.setId("proc"); + procStatus.setName("Processor 1"); procStatus.setProcessingNanos(123456789); Collection<ProcessorStatus> processorStatuses = new ArrayList<>(); @@ -159,6 +168,21 @@ class TestQueryNiFiReportingTask { groupStatuses.add(groupStatus3); status.setProcessGroupStatus(groupStatuses); + // Populate flow config history + FlowChangeAction action1 = new FlowChangeAction(); + action1.setId(123); + action1.setTimestamp(new Date()); + action1.setUserIdentity("test"); + action1.setSourceId("proc"); + action1.setSourceName("Processor 1"); + action1.setSourceType(Component.Processor); + action1.setOperation(Operation.Configure); + FlowChangeConfigureDetails configureDetails1 = new FlowChangeConfigureDetails(); + configureDetails1.setName("property1"); + configureDetails1.setPreviousValue("1"); + configureDetails1.setValue("2"); + action1.setActionDetails(configureDetails1); + flowConfigHistory = Collections.singletonList(action1); } @Test @@ -524,6 +548,24 @@ class TestQueryNiFiReportingTask { assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid")); } + @Test + void testFlowConfigHistoryTable() throws InitializationException { + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink"); + properties.put(QueryMetricsUtil.QUERY, "select * from FLOW_CONFIG_HISTORY"); + reportingTask = initTask(properties); + reportingTask.onTrigger(context); + + List<Map<String, Object>> rows = mockRecordSinkService.getRows(); + assertEquals(1, rows.size()); + // Validate the first row + Map<String, Object> row = rows.get(0); + assertEquals(22, row.size()); + // Verify the first row contents + assertEquals(123, row.get("actionId")); + assertEquals("Configure", row.get("actionOperation")); + } + private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException { final ComponentLog logger = mock(ComponentLog.class); @@ -552,6 +594,7 @@ class TestQueryNiFiReportingTask { final EventAccess eventAccess = mock(EventAccess.class); Mockito.when(context.getEventAccess()).thenReturn(eventAccess); Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); + Mockito.when(eventAccess.getFlowChanges(anyInt(), anyInt())).thenReturn(flowConfigHistory); final PropertyValue pValue = mock(StandardPropertyValue.class); mockRecordSinkService = new MockRecordSinkService();
