This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 09bc5bcb5a988f08d1c703ca3c4476f3f23aeb61
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Oct 25 15:19:59 2022 -0400

    NIFI-10699: Add FLOW_CONFIG_HISTORY table to QueryNiFiReportingTask
    
    This closes #6581
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-sql-reporting-tasks/pom.xml               |   6 +
 .../nifi/reporting/sql/MetricsSqlQueryService.java |   3 +
 .../nifi/reporting/sql/QueryNiFiReportingTask.java |  11 +-
 .../FlowConfigHistoryEnumerator.java               | 151 +++++++++++++++
 .../FlowConfigHistoryProjectTableScanRule.java     |  76 ++++++++
 .../flowconfighistory/FlowConfigHistoryTable.java  | 211 +++++++++++++++++++++
 .../FlowConfigHistoryTableScan.java                |  91 +++++++++
 .../nifi/reporting/sql/util/TrackedQueryTime.java  |   4 +-
 .../reporting/sql/TestQueryNiFiReportingTask.java  |  43 +++++
 9 files changed, 591 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
index 144c31fca6..97c188f707 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
@@ -91,6 +91,12 @@
             <version>1.19.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-user-actions</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-rules-engine-service-api</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
index 588da4aa85..09b3be0f4a 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsSqlQueryService.java
@@ -30,6 +30,7 @@ import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.sql.bulletins.BulletinTable;
 import org.apache.nifi.reporting.sql.connectionstatus.ConnectionStatusTable;
 import 
org.apache.nifi.reporting.sql.connectionstatuspredictions.ConnectionStatusPredictionsTable;
+import org.apache.nifi.reporting.sql.flowconfighistory.FlowConfigHistoryTable;
 import org.apache.nifi.reporting.sql.metrics.JvmMetricsTable;
 import 
org.apache.nifi.reporting.sql.processgroupstatus.ProcessGroupStatusTable;
 import org.apache.nifi.reporting.sql.processorstatus.ProcessorStatusTable;
@@ -169,6 +170,8 @@ public class MetricsSqlQueryService implements 
MetricsQueryService {
         rootSchema.add("BULLETINS", bulletinTable);
         final ProvenanceTable provenanceTable = new ProvenanceTable(context, 
getLogger());
         rootSchema.add("PROVENANCE", provenanceTable);
+        final FlowConfigHistoryTable flowConfigHistoryTable = new 
FlowConfigHistoryTable(context, getLogger());
+        rootSchema.add("FLOW_CONFIG_HISTORY", flowConfigHistoryTable);
 
         rootSchema.setCacheEnabled(false);
 
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index 61edaa2773..3d1b0b1a26 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -41,16 +41,18 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_END_TIME;
 import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.BULLETIN_START_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.FLOW_CONFIG_HISTORY_END_TIME;
+import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.FLOW_CONFIG_HISTORY_START_TIME;
 import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_END_TIME;
 import static 
org.apache.nifi.reporting.sql.util.TrackedQueryTime.PROVENANCE_START_TIME;
 import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_PRECISION;
 import static 
org.apache.nifi.util.db.JdbcProperties.VARIABLE_REGISTRY_ONLY_DEFAULT_SCALE;
 
-@Tags({"status", "connection", "processor", "jvm", "metrics", "history", 
"bulletin", "prediction", "process", "group", "provenance", "record", "sql"})
+@Tags({"status", "connection", "processor", "jvm", "metrics", "history", 
"bulletin", "prediction", "process", "group", "provenance", "record", "sql", 
"flow", "config"})
 @CapabilityDescription("Publishes NiFi status information based on the results 
of a user-specified SQL query. The query may make use of the CONNECTION_STATUS, 
PROCESSOR_STATUS, "
-        + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, 
CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions 
or capabilities provided by Apache Calcite. Note that the "
-        + "CONNECTION_STATUS_PREDICTIONS table is not available for querying 
if analytics are not enabled (see the nifi.analytics.predict.enabled property 
in nifi.properties). Attempting a "
-        + "query on the table when the capability is disabled will cause an 
error.")
+        + "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, 
CONNECTION_STATUS_PREDICTIONS, FLOW_CONFIG_HISTORY, or PROVENANCE tables, and 
can use any functions or capabilities provided by "
+        + "Apache Calcite. Note that the CONNECTION_STATUS_PREDICTIONS table 
is not available for querying if analytics are not enabled (see the 
nifi.analytics.predict.enabled property "
+        + "in nifi.properties). Attempting a query on the table when the 
capability is disabled will cause an error.")
 @Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last execution time so that on restart the task knows where it left off.")
 public class QueryNiFiReportingTask extends AbstractReportingTask implements 
QueryTimeAware {
 
@@ -92,6 +94,7 @@ public class QueryNiFiReportingTask extends 
AbstractReportingTask implements Que
         try {
             sql = processStartAndEndTimes(context, sql, BULLETIN_START_TIME, 
BULLETIN_END_TIME);
             sql = processStartAndEndTimes(context, sql, PROVENANCE_START_TIME, 
PROVENANCE_END_TIME);
+            sql = processStartAndEndTimes(context, sql, 
FLOW_CONFIG_HISTORY_START_TIME, FLOW_CONFIG_HISTORY_END_TIME);
 
             getLogger().debug("Executing query: {}", sql);
             final QueryResult queryResult = metricsQueryService.query(context, 
sql);
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryEnumerator.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryEnumerator.java
new file mode 100644
index 0000000000..7968a233cc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryEnumerator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting.sql.flowconfighistory;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.nifi.action.Action;
+import org.apache.nifi.action.details.ConfigureDetails;
+import org.apache.nifi.action.details.ConnectDetails;
+import org.apache.nifi.action.details.MoveDetails;
+import org.apache.nifi.action.details.PurgeDetails;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.ReportingContext;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class FlowConfigHistoryEnumerator implements Enumerator<Object> {
+    private final ReportingContext context;
+    private final ComponentLog logger;
+    private final int[] fields;
+
+    private Iterator<Action> actionIterator;
+    private Object currentRow;
+    private int recordsRead = 0;
+
+    public FlowConfigHistoryEnumerator(final ReportingContext context, final 
ComponentLog logger, final int[] fields) {
+        this.context = context;
+        this.logger = logger;
+        this.fields = fields;
+        reset();
+    }
+
+    @Override
+    public Object current() {
+        return currentRow;
+    }
+
+    @Override
+    public boolean moveNext() {
+        currentRow = null;
+
+        if (!actionIterator.hasNext()) {
+            // If we are out of data, close the InputStream. We do this because
+            // Calcite does not necessarily call our close() method.
+            close();
+            try {
+                onFinish();
+            } catch (final Exception e) {
+                logger.error("Failed to perform tasks when enumerator was 
finished", e);
+            }
+
+            return false;
+        }
+
+        final Action action = actionIterator.next();
+        currentRow = filterColumns(action);
+
+        recordsRead++;
+        return true;
+    }
+
+    protected int getRecordsRead() {
+        return recordsRead;
+    }
+
+    protected void onFinish() {
+    }
+
+    private Object filterColumns(final Action action) {
+        if (action == null) {
+            return null;
+        }
+
+        final boolean isClustered = context.isClustered();
+        String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            nodeId = "unknown";
+        }
+
+        final Object[] row = new Object[]{
+                action.getId(),
+                action.getTimestamp().getTime(),
+                action.getUserIdentity(),
+                action.getSourceId(),
+                action.getSourceName(),
+                action.getSourceType(),
+                action.getOperation().toString(),
+                action.getActionDetails() instanceof ConfigureDetails ? 
((ConfigureDetails) action.getActionDetails()).getName() : null,
+                action.getActionDetails() instanceof ConfigureDetails ? 
((ConfigureDetails) action.getActionDetails()).getPreviousValue() : null,
+                action.getActionDetails() instanceof ConfigureDetails ? 
((ConfigureDetails) action.getActionDetails()).getValue() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getSourceId() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getSourceName() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getSourceType() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getDestinationId() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getDestinationName() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getDestinationType() : null,
+                action.getActionDetails() instanceof ConnectDetails ? 
((ConnectDetails) action.getActionDetails()).getRelationship() : null,
+                action.getActionDetails() instanceof MoveDetails ? 
((MoveDetails) action.getActionDetails()).getGroup() : null,
+                action.getActionDetails() instanceof MoveDetails ? 
((MoveDetails) action.getActionDetails()).getGroupId() : null,
+                action.getActionDetails() instanceof MoveDetails ? 
((MoveDetails) action.getActionDetails()).getPreviousGroup() : null,
+                action.getActionDetails() instanceof MoveDetails ? 
((MoveDetails) action.getActionDetails()).getPreviousGroupId() : null,
+                action.getActionDetails() instanceof PurgeDetails ? 
((PurgeDetails) action.getActionDetails()).getEndDate().getTime() : null
+        };
+
+        // If we want no fields just return null
+        if (fields == null) {
+            return row;
+        }
+
+        // If we want only a single field, then Calcite is going to expect us 
to return
+        // the actual value, NOT a 1-element array of values.
+        if (fields.length == 1) {
+            final int desiredCellIndex = fields[0];
+            return row[desiredCellIndex];
+        }
+
+        // Create a new Object array that contains only the desired fields.
+        final Object[] filtered = new Object[fields.length];
+        for (int i = 0; i < fields.length; i++) {
+            final int indexToKeep = fields[i];
+            filtered[i] = row[indexToKeep];
+        }
+
+        return filtered;
+    }
+
+    @Override
+    public void reset() {
+        List<Action> fullFlowConfigHistoryList = 
context.getEventAccess().getFlowChanges(0, Short.MAX_VALUE);
+        actionIterator = fullFlowConfigHistoryList.iterator();
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryProjectTableScanRule.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryProjectTableScanRule.java
new file mode 100644
index 0000000000..9f33e4f00d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryProjectTableScanRule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.flowconfighistory;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * Planner rule that projects from a {@link FlowConfigHistoryTableScan} scan 
just the columns
+ * needed to satisfy a projection. If the projection's expressions are trivial,
+ * the projection is removed.
+ */
+public class FlowConfigHistoryProjectTableScanRule extends RelOptRule {
+    public static final FlowConfigHistoryProjectTableScanRule INSTANCE = new 
FlowConfigHistoryProjectTableScanRule();
+
+    private FlowConfigHistoryProjectTableScanRule() {
+        super(
+            operand(LogicalProject.class,
+                operand(FlowConfigHistoryTableScan.class, none())),
+            "BulletinProjectTableScanRule");
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final LogicalProject project = call.rel(0);
+        final FlowConfigHistoryTableScan scan = call.rel(1);
+        final int[] fields = getProjectFields(project.getProjects());
+
+        if (fields == null) {
+            // Project contains expressions more complex than just field 
references.
+            return;
+        }
+
+        call.transformTo(
+            new FlowConfigHistoryTableScan(
+                scan.getCluster(),
+                scan.getTable(),
+                scan.flowConfigHistoryTable,
+                fields));
+    }
+
+    private int[] getProjectFields(List<RexNode> exps) {
+        final int[] fields = new int[exps.size()];
+
+        for (int i = 0; i < exps.size(); i++) {
+            final RexNode exp = exps.get(i);
+
+            if (exp instanceof RexInputRef) {
+                fields[i] = ((RexInputRef) exp).getIndex();
+            } else {
+                return null; // not a simple projection
+            }
+        }
+
+        return fields;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTable.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTable.java
new file mode 100644
index 0000000000..c5af557030
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTable.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.flowconfighistory;
+
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.Pair;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.ReportingContext;
+
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+public class FlowConfigHistoryTable extends AbstractTable implements 
QueryableTable, TranslatableTable {
+
+    private final ComponentLog logger;
+
+    private RelDataType relDataType = null;
+
+    private final ReportingContext context;
+    private volatile int maxRecordsRead;
+
+    private final Set<FlowConfigHistoryEnumerator> enumerators = new 
HashSet<>();
+
+    /**
+     * Creates a Flow Configuration History table.
+     */
+    public FlowConfigHistoryTable(final ReportingContext context, final 
ComponentLog logger) {
+        this.context = context;
+        this.logger = logger;
+    }
+
+    @Override
+    public String toString() {
+        return "FlowConfigHistoryTable";
+    }
+
+    public void close() {
+        synchronized (enumerators) {
+            for (final FlowConfigHistoryEnumerator enumerator : enumerators) {
+                enumerator.close();
+            }
+        }
+    }
+
+    /**
+     * Returns an enumerable over a given projection of the fields.
+     *
+     * <p>
+     * Called from generated code.
+     */
+    public Enumerable<Object> project(final int[] fields) {
+        return new AbstractEnumerable<Object>() {
+            @Override
+            @SuppressWarnings({"unchecked", "rawtypes"})
+            public Enumerator<Object> enumerator() {
+                final FlowConfigHistoryEnumerator flowConfigHistoryEnumerator 
= new FlowConfigHistoryEnumerator(context, logger, fields) {
+                    @Override
+                    protected void onFinish() {
+                        final int recordCount = getRecordsRead();
+                        if (recordCount > maxRecordsRead) {
+                            maxRecordsRead = recordCount;
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        synchronized (enumerators) {
+                            enumerators.remove(this);
+                        }
+                        super.close();
+                    }
+                };
+
+                synchronized (enumerators) {
+                    enumerators.add(flowConfigHistoryEnumerator);
+                }
+
+                return flowConfigHistoryEnumerator;
+            }
+        };
+    }
+
+    public int getRecordsRead() {
+        return maxRecordsRead;
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public Expression getExpression(final SchemaPlus schema, final String 
tableName, final Class clazz) {
+        return Schemas.tableExpression(schema, getElementType(), tableName, 
clazz);
+    }
+
+    @Override
+    public Type getElementType() {
+        return Object[].class;
+    }
+
+    @Override
+    public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, 
final SchemaPlus schema, final String tableName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RelNode toRel(final RelOptTable.ToRelContext context, final 
RelOptTable relOptTable) {
+        // Request all fields.
+        final int fieldCount = relOptTable.getRowType().getFieldCount();
+        final int[] fields = new int[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            fields[i] = i;
+        }
+
+        return new FlowConfigHistoryTableScan(context.getCluster(), 
relOptTable, this, fields);
+    }
+
+    @Override
+    public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+        if (relDataType != null) {
+            return relDataType;
+        }
+
+        final List<String> names = Arrays.asList(
+                "actionId",
+                "actionTimestamp",
+                "actionUserIdentity",
+                "actionSourceId",
+                "actionSourceName",
+                "actionSourceType",
+                "actionOperation",
+                "configureDetailsName",
+                "configureDetailsPreviousValue",
+                "configureDetailsValue",
+                "connectionSourceId",
+                "connectionSourceName",
+                "connectionSourceType",
+                "connectionDestinationId",
+                "connectionDestinationName",
+                "connectionDestinationType",
+                "connectionRelationship",
+                "moveGroup",
+                "moveGroupId",
+                "movePreviousGroup",
+                "movePreviousGroupId",
+                "purgeEndDate"
+        );
+        final List<RelDataType> types = Arrays.asList(
+                typeFactory.createJavaType(int.class),
+                typeFactory.createJavaType(long.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(String.class),
+                typeFactory.createJavaType(long.class)
+        );
+
+        relDataType = typeFactory.createStructType(Pair.zip(names, types));
+        return relDataType;
+    }
+
+    @Override
+    public TableType getJdbcTableType() {
+        return TableType.TEMPORARY_TABLE;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTableScan.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTableScan.java
new file mode 100644
index 0000000000..b5ae01e6f6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/flowconfighistory/FlowConfigHistoryTableScan.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.flowconfighistory;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+import java.util.List;
+
+/**
+ * Relational expression representing a query for bulletin information.
+ *
+ * <p>
+ * Like any table scan, it serves as a leaf node of a query tree.
+ * </p>
+ */
+public class FlowConfigHistoryTableScan extends TableScan implements 
EnumerableRel {
+    final FlowConfigHistoryTable flowConfigHistoryTable;
+    final int[] fields;
+
+    protected FlowConfigHistoryTableScan(final RelOptCluster cluster, final 
RelOptTable table, final FlowConfigHistoryTable flowConfigHistoryTable, final 
int[] fields) {
+        super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), 
table);
+
+        this.flowConfigHistoryTable = flowConfigHistoryTable;
+        this.fields = fields;
+    }
+
+    @Override
+    public RelNode copy(final RelTraitSet traitSet, final List<RelNode> 
inputs) {
+        return new FlowConfigHistoryTableScan(getCluster(), table, 
flowConfigHistoryTable, fields);
+    }
+
+    @Override
+    public RelWriter explainTerms(final RelWriter pw) {
+        return super.explainTerms(pw).item("fields", Primitive.asList(fields));
+    }
+
+    @Override
+    public RelDataType deriveRowType() {
+        final List<RelDataTypeField> fieldList = 
table.getRowType().getFieldList();
+        final RelDataTypeFactory.Builder builder = 
getCluster().getTypeFactory().builder();
+        for (int field : fields) {
+            builder.add(fieldList.get(field));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void register(RelOptPlanner planner) {
+        planner.addRule(FlowConfigHistoryProjectTableScanRule.INSTANCE);
+    }
+
+    @Override
+    public Result implement(EnumerableRelImplementor implementor, Prefer pref) 
{
+        PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), 
getRowType(), pref.preferArray());
+
+        return implementor.result(physType, Blocks.toBlock(
+            
Expressions.call(table.getExpression(FlowConfigHistoryTable.class), "project", 
Expressions.constant(fields))));
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
index cf46a291a4..a9c003125e 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/TrackedQueryTime.java
@@ -21,7 +21,9 @@ public enum TrackedQueryTime {
     BULLETIN_START_TIME("$bulletinStartTime"),
     BULLETIN_END_TIME("$bulletinEndTime"),
     PROVENANCE_START_TIME("$provenanceStartTime"),
-    PROVENANCE_END_TIME("$provenanceEndTime");
+    PROVENANCE_END_TIME("$provenanceEndTime"),
+    FLOW_CONFIG_HISTORY_START_TIME("$flowConfigHistoryStartTime"),
+    FLOW_CONFIG_HISTORY_END_TIME("$flowConfigHistoryEndTime");
 
     private final String sqlPlaceholder;
 
diff --git 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index 286cf07067..6b0f1c36a6 100644
--- 
a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -17,6 +17,11 @@
 package org.apache.nifi.reporting.sql;
 
 
+import org.apache.nifi.action.Action;
+import org.apache.nifi.action.Component;
+import org.apache.nifi.action.FlowChangeAction;
+import org.apache.nifi.action.Operation;
+import org.apache.nifi.action.details.FlowChangeConfigureDetails;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -61,6 +66,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 
 class TestQueryNiFiReportingTask {
@@ -86,6 +93,7 @@ class TestQueryNiFiReportingTask {
     private MockProvenanceRepository mockProvenanceRepository;
     private AtomicLong currentTime;
     private MockStateManager mockStateManager;
+    private List<Action> flowConfigHistory;
 
     @BeforeEach
     public void setup() {
@@ -105,6 +113,7 @@ class TestQueryNiFiReportingTask {
         // create a processor status with processing time
         ProcessorStatus procStatus = new ProcessorStatus();
         procStatus.setId("proc");
+        procStatus.setName("Processor 1");
         procStatus.setProcessingNanos(123456789);
 
         Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
@@ -159,6 +168,21 @@ class TestQueryNiFiReportingTask {
         groupStatuses.add(groupStatus3);
         status.setProcessGroupStatus(groupStatuses);
 
+        // Populate flow config history
+        FlowChangeAction action1 = new FlowChangeAction();
+        action1.setId(123);
+        action1.setTimestamp(new Date());
+        action1.setUserIdentity("test");
+        action1.setSourceId("proc");
+        action1.setSourceName("Processor 1");
+        action1.setSourceType(Component.Processor);
+        action1.setOperation(Operation.Configure);
+        FlowChangeConfigureDetails configureDetails1 = new 
FlowChangeConfigureDetails();
+        configureDetails1.setName("property1");
+        configureDetails1.setPreviousValue("1");
+        configureDetails1.setValue("2");
+        action1.setActionDetails(configureDetails1);
+        flowConfigHistory = Collections.singletonList(action1);
     }
 
     @Test
@@ -524,6 +548,24 @@ class TestQueryNiFiReportingTask {
         assertEquals(flowFileUuid, row.get("bulletinFlowFileUuid"));
     }
 
+    @Test
+    void testFlowConfigHistoryTable() throws InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from 
FLOW_CONFIG_HISTORY");
+        reportingTask = initTask(properties);
+        reportingTask.onTrigger(context);
+
+        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
+        assertEquals(1, rows.size());
+        // Validate the first row
+        Map<String, Object> row = rows.get(0);
+        assertEquals(22, row.size());
+        // Verify the first row contents
+        assertEquals(123, row.get("actionId"));
+        assertEquals("Configure", row.get("actionOperation"));
+    }
+
     private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException {
 
         final ComponentLog logger = mock(ComponentLog.class);
@@ -552,6 +594,7 @@ class TestQueryNiFiReportingTask {
         final EventAccess eventAccess = mock(EventAccess.class);
         Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
         Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
+        Mockito.when(eventAccess.getFlowChanges(anyInt(), 
anyInt())).thenReturn(flowConfigHistory);
 
         final PropertyValue pValue = mock(StandardPropertyValue.class);
         mockRecordSinkService = new MockRecordSinkService();


Reply via email to