This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2aa11aadfd NIFI-15508 Added Record Gauge support to Standard Process
Session
2aa11aadfd is described below
commit 2aa11aadfdc25d48321aa9a4b9bcb1b744c007b7
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Jan 23 15:05:23 2026 -0600
NIFI-15508 Added Record Gauge support to Standard Process Session
- Added ComponentMetricReporter to Framework API
- Added application property for configuring ComponentMetricReporter
- Added getGaugeValues method to TestRunner
This closes #10813.
Signed-off-by: Pierre Villard <[email protected]>
---
.../src/main/asciidoc/administration-guide.adoc | 9 +++
.../controller/metrics/ComponentMetricContext.java | 35 +++++++++
.../metrics/ComponentMetricReporter.java | 54 +++++++++++++
...omponentMetricReporterConfigurationContext.java | 40 ++++++++++
.../nifi/controller/metrics/CounterRecord.java | 35 +++++++++
.../nifi/controller/metrics/GaugeRecord.java | 35 +++++++++
.../metrics/DefaultComponentMetricReporter.java | 32 ++++++++
...omponentMetricReporterConfigurationContext.java | 44 +++++++++++
.../repository/AbstractRepositoryContext.java | 62 +++++++++++----
.../controller/repository/RepositoryContext.java | 6 ++
.../repository/StandardProcessSession.java | 48 +++++++++---
.../org/apache/nifi/controller/FlowController.java | 35 ++++++++-
.../flow/StandardStatelessGroupNodeFactory.java | 1 +
.../repository/StandardRepositoryContext.java | 18 ++++-
.../scheduling/RepositoryContextFactory.java | 31 ++++++--
.../configuration/FlowControllerConfiguration.java | 33 ++++++++
.../repository/StandardProcessSessionIT.java | 12 ++-
.../nar/StandardExtensionDiscoveringManager.java | 2 +
.../apache/nifi/headless/HeadlessNiFiServer.java | 4 +
.../org/apache/nifi/util/MockProcessSession.java | 23 ++++++
.../org/apache/nifi/util/SharedSessionState.java | 14 ++++
.../nifi/util/StandardProcessorTestRunner.java | 5 ++
.../main/java/org/apache/nifi/util/TestRunner.java | 8 ++
.../nifi/util/TestStandardProcessorTestRunner.java | 47 +++++++-----
.../flow/StandardStatelessDataflowFactory.java | 5 +-
.../repository/StatelessRepositoryContext.java | 17 ++++-
.../StatelessRepositoryContextFactory.java | 15 +++-
.../pom.xml | 25 +++---
.../pom.xml | 24 +++---
.../metrics/SystemTestComponentMetricReporter.java | 88 ++++++++++++++++++++++
...nifi.controller.metrics.ComponentMetricReporter | 15 ++++
.../pom.xml | 16 +---
.../nifi/processors/tests/system/UpdateMetric.java | 58 ++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
nifi-system-tests/nifi-system-test-suite/pom.xml | 6 ++
.../src/test/assembly/dependencies.xml | 1 +
.../system/metrics/ComponentMetricReporterIT.java | 75 ++++++++++++++++++
nifi-system-tests/pom.xml | 1 +
38 files changed, 873 insertions(+), 107 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 869431aab0..afa1522015 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2922,6 +2922,15 @@ The Flow Action Reporter is a framework interface that
supports exporting flow c
|`nifi.flow.action.reporter.implementation` | The class implementing
`org.apache.nifi.action.FlowActionReporter` from `nifi-framework-api`. The
default value is not specified.
|====
+=== Component Metric Reporter
+
+The Component Metric Reporter is a framework interface that supports exporting
processing metrics including Counters and Gauges using a custom implementation
class.
+
+|====
+|*Property* | *Description*
+|`nifi.component.metric.reporter.implementation` | The class implementing
`org.apache.nifi.controller.metrics.ComponentMetricReporter` from
`nifi-framework-api`. The default value is not specified.
+|====
+
=== FlowFile Repository
The FlowFile repository keeps track of the attributes and current state of
each FlowFile in the system. By default,
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricContext.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricContext.java
new file mode 100644
index 0000000000..4396152713
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import java.util.Map;
+
+/**
+ * Context associated with Component Metric records
+ *
+ * @param id Component Identifier
+ * @param name Component Name
+ * @param componentType Component Type
+ * @param attributes Map of additional attributes containing Process Group and
related information
+ */
+public record ComponentMetricContext(
+ String id,
+ String name,
+ String componentType,
+ Map<String, String> attributes
+) {
+}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricReporter.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricReporter.java
new file mode 100644
index 0000000000..8468e40562
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricReporter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import java.io.Closeable;
+
+/**
+ * Framework abstraction for reporting metrics emitted from processing
components
+ */
+public interface ComponentMetricReporter extends Closeable {
+ /**
+ * Configuration lifecycle method that the application invokes after
instantiating the class
+ *
+ * @param context Configuration Context properties
+ */
+ default void onConfigured(ComponentMetricReporterConfigurationContext
context) {
+
+ }
+
+ /**
+ * Close resources created during Reporter configuration and processing
+ */
+ @Override
+ default void close() {
+ }
+
+ /**
+ * Record Gauge Record
+ *
+ * @param gaugeRecord Gauge Record required
+ */
+ void recordGauge(GaugeRecord gaugeRecord);
+
+ /**
+ * Record Counter Record
+ *
+ * @param counterRecord Counter Record required
+ */
+ void recordCounter(CounterRecord counterRecord);
+}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricReporterConfigurationContext.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricReporterConfigurationContext.java
new file mode 100644
index 0000000000..2d8aa0fa1f
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/ComponentMetricReporterConfigurationContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.X509TrustManager;
+
+/**
+ * Configuration Context with properties provided to Component Metric Reporter
on initial configuration
+ */
+public interface ComponentMetricReporterConfigurationContext {
+ /**
+ * Get SSL Context when configured in application properties
+ *
+ * @return SSLContext or empty when not configured
+ */
+ Optional<SSLContext> getSSLContext();
+
+ /**
+ * Get Trust Manager when configured in application properties
+ *
+ * @return X509TrustManager or empty when not configured
+ */
+ Optional<X509TrustManager> getTrustManager();
+}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/CounterRecord.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/CounterRecord.java
new file mode 100644
index 0000000000..ed8a070f07
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/CounterRecord.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import java.time.Instant;
+
+/**
+ * Single measurement for a named Counter recorded during processing
+ *
+ * @param name Counter Name
+ * @param value Counter Value
+ * @param recorded Timestamp when the Component recorded the Counter value
+ * @param componentMetricContext Context for Component Metric record
+ */
+public record CounterRecord(
+ String name,
+ long value,
+ Instant recorded,
+ ComponentMetricContext componentMetricContext
+) {
+}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/GaugeRecord.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/GaugeRecord.java
new file mode 100644
index 0000000000..32971f1cb1
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/metrics/GaugeRecord.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import java.time.Instant;
+
+/**
+ * Single measurement for a named Gauge recorded during processing
+ *
+ * @param name Gauge Name
+ * @param value Gauge Value
+ * @param recorded Timestamp when the Processor recorded the Gauge value
+ * @param componentMetricContext Context for Component Metric record
+ */
+public record GaugeRecord(
+ String name,
+ double value,
+ Instant recorded,
+ ComponentMetricContext componentMetricContext
+) {
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/metrics/DefaultComponentMetricReporter.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/metrics/DefaultComponentMetricReporter.java
new file mode 100644
index 0000000000..a11119f51a
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/metrics/DefaultComponentMetricReporter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+/**
+ * Default implementation of Component Metric Reporter with empty method
implementations
+ */
+public class DefaultComponentMetricReporter implements ComponentMetricReporter
{
+ @Override
+ public void recordGauge(final GaugeRecord gaugeRecord) {
+
+ }
+
+ @Override
+ public void recordCounter(final CounterRecord counterRecord) {
+
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/metrics/StandardComponentMetricReporterConfigurationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/metrics/StandardComponentMetricReporterConfigurationContext.java
new file mode 100644
index 0000000000..8e2e866084
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/metrics/StandardComponentMetricReporterConfigurationContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.X509TrustManager;
+
+/**
+ * Standard implementation of Configuration Context properties for Component
Metric Reporter
+ */
+public class StandardComponentMetricReporterConfigurationContext implements
ComponentMetricReporterConfigurationContext {
+ private final SSLContext sslContext;
+ private final X509TrustManager trustManager;
+
+ public StandardComponentMetricReporterConfigurationContext(final
SSLContext sslContext, final X509TrustManager trustManager) {
+ this.sslContext = sslContext;
+ this.trustManager = trustManager;
+ }
+
+ @Override
+ public Optional<SSLContext> getSSLContext() {
+ return Optional.ofNullable(sslContext);
+ }
+
+ @Override
+ public Optional<X509TrustManager> getTrustManager() {
+ return Optional.ofNullable(trustManager);
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/AbstractRepositoryContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/AbstractRepositoryContext.java
index 31a8e84fbc..c7d74449b3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/AbstractRepositoryContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/AbstractRepositoryContext.java
@@ -22,6 +22,10 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.metrics.ComponentMetricContext;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
+import org.apache.nifi.controller.metrics.CounterRecord;
+import org.apache.nifi.controller.metrics.GaugeRecord;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.InternalProvenanceReporter;
@@ -31,10 +35,12 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.util.Connectables;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
@@ -44,23 +50,41 @@ public abstract class AbstractRepositoryContext implements
RepositoryContext {
private final FlowFileRepository flowFileRepo;
private final FlowFileEventRepository flowFileEventRepo;
private final CounterRepository counterRepo;
+ private final ComponentMetricReporter componentMetricReporter;
private final ProvenanceEventRepository provenanceRepo;
private final AtomicLong connectionIndex;
private final StateManager stateManager;
-
- public AbstractRepositoryContext(final Connectable connectable, final
AtomicLong connectionIndex, final ContentRepository contentRepository,
- final FlowFileRepository
flowFileRepository, final FlowFileEventRepository flowFileEventRepository,
- final CounterRepository
counterRepository, final ProvenanceEventRepository provenanceRepository,
- final StateManager stateManager) {
+ private final ComponentMetricContext componentMetricContext;
+
+ private final String componentNameCounterContext;
+ private final String componentTypeCounterContext;
+
+ public AbstractRepositoryContext(
+ final Connectable connectable,
+ final AtomicLong connectionIndex,
+ final ContentRepository contentRepository,
+ final FlowFileRepository flowFileRepository,
+ final FlowFileEventRepository flowFileEventRepository,
+ final CounterRepository counterRepository,
+ final ComponentMetricReporter componentMetricReporter,
+ final ProvenanceEventRepository provenanceRepository,
+ final StateManager stateManager
+ ) {
this.connectable = connectable;
- contentRepo = contentRepository;
- flowFileRepo = flowFileRepository;
- flowFileEventRepo = flowFileEventRepository;
- counterRepo = counterRepository;
- provenanceRepo = provenanceRepository;
+ this.contentRepo = contentRepository;
+ this.flowFileRepo = flowFileRepository;
+ this.flowFileEventRepo = flowFileEventRepository;
+ this.counterRepo = counterRepository;
+ this.componentMetricReporter = componentMetricReporter;
+ this.provenanceRepo = provenanceRepository;
this.connectionIndex = connectionIndex;
this.stateManager = stateManager;
+ final Map<String, String> groupAttributes =
connectable.getProcessGroup().getLoggingAttributes();
+ this.componentMetricContext = new
ComponentMetricContext(connectable.getIdentifier(), connectable.getName(),
connectable.getComponentType(), groupAttributes);
+
+ this.componentNameCounterContext = connectable.getName() + " (" +
connectable.getIdentifier() + ")";
+ this.componentTypeCounterContext = "All " +
connectable.getComponentType() + "'s";
}
@Override
@@ -68,6 +92,11 @@ public abstract class AbstractRepositoryContext implements
RepositoryContext {
return connectable;
}
+ @Override
+ public ComponentMetricContext getComponentMetricContext() {
+ return componentMetricContext;
+ }
+
/**
*
* @param relationship relationship
@@ -133,11 +162,16 @@ public abstract class AbstractRepositoryContext
implements RepositoryContext {
@Override
public void adjustCounter(final String name, final long delta) {
- final String localContext = connectable.getName() + " (" +
connectable.getIdentifier() + ")";
- final String globalContext = "All " + connectable.getComponentType() +
"'s";
+ counterRepo.adjustCounter(componentNameCounterContext, name, delta);
+ counterRepo.adjustCounter(componentTypeCounterContext, name, delta);
+
+ final CounterRecord counterRecord = new CounterRecord(name, delta,
Instant.now(), componentMetricContext);
+ componentMetricReporter.recordCounter(counterRecord);
+ }
- counterRepo.adjustCounter(localContext, name, delta);
- counterRepo.adjustCounter(globalContext, name, delta);
+ @Override
+ public void recordGauge(final GaugeRecord gaugeRecord) {
+ componentMetricReporter.recordGauge(gaugeRecord);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
index bc0cc8eb19..25051d9ee3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
@@ -20,6 +20,8 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.metrics.ComponentMetricContext;
+import org.apache.nifi.controller.metrics.GaugeRecord;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.flowfile.FlowFile;
@@ -35,6 +37,8 @@ import java.util.function.Predicate;
public interface RepositoryContext {
Connectable getConnectable();
+ ComponentMetricContext getComponentMetricContext();
+
Collection<Connection> getConnections(Relationship relationship);
List<Connection> getPollableConnections();
@@ -61,6 +65,8 @@ public interface RepositoryContext {
void adjustCounter(String name, long delta);
+ void recordGauge(GaugeRecord gaugeRecord);
+
ProvenanceEventBuilder createProvenanceEventBuilder();
StateManager getStateManager();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a12f7a626e..6b35df4cbd 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -24,6 +24,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
+import org.apache.nifi.controller.metrics.GaugeRecord;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
@@ -55,6 +56,7 @@ import
org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.provenance.InternalProvenanceReporter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -80,6 +82,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -152,6 +155,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
private Map<String, Long> countersOnCommit;
private Map<String, Long> immediateCounters;
+ private List<GaugeRecord> gaugeRecordsSessionCommitted;
private final Set<String> removedFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>(); // UUID of
any FlowFile that was created in this session
@@ -671,6 +675,10 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
context.adjustCounter(entry.getKey(), entry.getValue());
}
+ for (final GaugeRecord gaugeRecord :
checkpoint.gaugeRecordsSessionCommitted) {
+ context.recordGauge(gaugeRecord);
+ }
+
if (LOG.isDebugEnabled()) {
final StringBuilder timingInfo = new StringBuilder();
timingInfo.append("Session commit for ").append(this).append("
[").append(connectableDescription).append("]").append(" took ");
@@ -1419,6 +1427,9 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
if (immediateCounters != null) {
immediateCounters.clear();
}
+ if (gaugeRecordsSessionCommitted != null) {
+ gaugeRecordsSessionCommitted.clear();
+ }
generatedProvenanceEvents.clear();
forkEventBuilders.clear();
@@ -1836,6 +1847,24 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
+ connection + " but a FlowFile with that ID already exists in the
session");
}
+ @Override
+ public void recordGauge(final String name, final double value, final
CommitTiming commitTiming) {
+ Objects.requireNonNull(name, "Gauge Name required");
+ Objects.requireNonNull(commitTiming, "Commit Timing required");
+
+ final Instant recorded = Instant.now();
+ final GaugeRecord gaugeRecord = new GaugeRecord(name, value, recorded,
context.getComponentMetricContext());
+
+ if (CommitTiming.NOW == commitTiming) {
+ context.recordGauge(gaugeRecord);
+ } else {
+ if (gaugeRecordsSessionCommitted == null) {
+ gaugeRecordsSessionCommitted = new ArrayList<>();
+ }
+ gaugeRecordsSessionCommitted.add(gaugeRecord);
+ }
+ }
+
@Override
public void adjustCounter(final String name, final long delta, final
boolean immediate) {
// If we are adjusting the counter immediately, allow it even if the
task is terminated. The contract states:
@@ -1858,23 +1887,16 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
counters = countersOnCommit;
}
- adjustCounter(name, delta, counters);
+ // Set current value or adjust when found
+ counters.compute(name, (currentName, currentValue) ->
+ currentValue == null ? delta : currentValue + delta
+ );
if (immediate) {
context.adjustCounter(name, delta);
}
}
- private void adjustCounter(final String name, final long delta, final
Map<String, Long> map) {
- Long curVal = map.get(name);
- if (curVal == null) {
- curVal = 0L;
- }
-
- final long newValue = curVal + delta;
- map.put(name, newValue);
- }
-
@Override
public FlowFile get() {
verifyTaskActive();
@@ -3887,6 +3909,8 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
private Map<String, Long> countersOnCommit;
private Map<String, Long> immediateCounters;
+ private List<GaugeRecord> gaugeRecordsSessionCommitted;
+
private Map<FlowFile, Path> deleteOnCommit;
private Set<String> removedFlowFiles;
private Set<String> createdFlowFiles;
@@ -3923,6 +3947,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
countersOnCommit = new HashMap<>();
immediateCounters = new HashMap<>();
+ gaugeRecordsSessionCommitted = new ArrayList<>();
deleteOnCommit = new HashMap<>();
removedFlowFiles = new HashSet<>();
@@ -3956,6 +3981,7 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
this.connectionCounts = session.connectionCounts;
this.countersOnCommit = session.countersOnCommit == null ?
Collections.emptyMap() : session.countersOnCommit;
this.immediateCounters = session.immediateCounters == null ?
Collections.emptyMap() : session.immediateCounters;
+ this.gaugeRecordsSessionCommitted =
session.gaugeRecordsSessionCommitted == null ? List.of() :
session.gaugeRecordsSessionCommitted;
this.deleteOnCommit = session.deleteOnCommit;
this.removedFlowFiles = session.removedFlowFiles;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 23f11db5fb..af0eecb03f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -74,6 +74,7 @@ import
org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import
org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.QueueSize;
@@ -288,6 +289,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
private final NiFiProperties nifiProperties;
private final Set<RemoteSiteListener> externalSiteListeners = new
HashSet<>();
private final AtomicReference<CounterRepository> counterRepositoryRef;
+ private final ComponentMetricReporter componentMetricReporter;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
private final StandardControllerServiceProvider controllerServiceProvider;
@@ -405,6 +407,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
final NiFiProperties properties,
final Authorizer authorizer,
final AuditService auditService,
+ final ComponentMetricReporter componentMetricReporter,
final PropertyEncryptor encryptor,
final BulletinRepository bulletinRepo,
final ExtensionDiscoveringManager extensionManager,
@@ -419,6 +422,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
properties,
authorizer,
auditService,
+ componentMetricReporter,
encryptor,
/* configuredForClustering */ false,
/* NodeProtocolSender */ null,
@@ -440,6 +444,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
final NiFiProperties properties,
final Authorizer authorizer,
final AuditService auditService,
+ final ComponentMetricReporter componentMetricReporter,
final PropertyEncryptor encryptor,
final NodeProtocolSender protocolSender,
final BulletinRepository bulletinRepo,
@@ -459,6 +464,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
properties,
authorizer,
auditService,
+ componentMetricReporter,
encryptor,
/* configuredForClustering */ true,
protocolSender,
@@ -480,6 +486,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
final NiFiProperties nifiProperties,
final Authorizer authorizer,
final AuditService auditService,
+ final ComponentMetricReporter componentMetricReporter,
final PropertyEncryptor encryptor,
final boolean configuredForClustering,
final NodeProtocolSender protocolSender,
@@ -504,6 +511,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
this.clusterCoordinator = clusterCoordinator;
this.authorizer = authorizer;
this.auditService = auditService;
+ this.componentMetricReporter = componentMetricReporter;
this.configuredForClustering = configuredForClustering;
this.revisionManager = revisionManager;
this.statusHistoryRepository = statusHistoryRepository;
@@ -563,8 +571,16 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
parameterContextManager = new StandardParameterContextManager();
final long maxAppendableBytes = getMaxAppendableBytes();
- repositoryContextFactory = new
RepositoryContextFactory(contentRepository, flowFileRepository,
flowFileEventRepository,
- counterRepositoryRef.get(), provenanceRepository,
stateManagerProvider, maxAppendableBytes);
+ repositoryContextFactory = new RepositoryContextFactory(
+ contentRepository,
+ flowFileRepository,
+ flowFileEventRepository,
+ counterRepositoryRef.get(),
+ getComponentMetricReporter(),
+ provenanceRepository,
+ stateManagerProvider,
+ maxAppendableBytes
+ );
assetManager = createAssetManager(nifiProperties);
this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow
Analysis", true);
@@ -1050,8 +1066,16 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
// Begin expiring FlowFiles that are old
final long maxAppendableClaimBytes = getMaxAppendableBytes();
- final RepositoryContextFactory contextFactory = new
RepositoryContextFactory(contentRepository, flowFileRepository,
- flowFileEventRepository, counterRepositoryRef.get(),
provenanceRepository, stateManagerProvider, maxAppendableClaimBytes);
+ final RepositoryContextFactory contextFactory = new
RepositoryContextFactory(
+ contentRepository,
+ flowFileRepository,
+ flowFileEventRepository,
+ counterRepositoryRef.get(),
+ getComponentMetricReporter(),
+ provenanceRepository,
+ stateManagerProvider,
+ maxAppendableClaimBytes
+ );
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this,
contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
// now that we've loaded the FlowFiles, this has restored our
ContentClaims' states, so we can tell the
@@ -2416,6 +2440,9 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
processScheduler.disableReportingTask(reportingTaskNode);
}
+ public ComponentMetricReporter getComponentMetricReporter() {
+ return componentMetricReporter;
+ }
//
// Counters
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
index 879023445a..0bc191d297 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java
@@ -128,6 +128,7 @@ public class StandardStatelessGroupNodeFactory implements
StatelessGroupNodeFact
flowFileRepository,
flowController.getFlowFileEventRepository(),
flowController.getCounterRepository(),
+ flowController.getComponentMetricReporter(),
flowController.getStateManagerProvider());
final FlowMappingOptions flowMappingOptions = new
FlowMappingOptions.Builder()
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java
index ddb1b4aa8b..4b569269de 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import
org.apache.nifi.controller.repository.claim.StandardContentClaimWriteCache;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
@@ -29,10 +30,19 @@ public class StandardRepositoryContext extends
AbstractRepositoryContext impleme
private final long maxAppendableClaimBytes;
- public StandardRepositoryContext(final Connectable connectable, final
AtomicLong connectionIndex, final ContentRepository contentRepository, final
FlowFileRepository flowFileRepository,
- final FlowFileEventRepository
flowFileEventRepository, final CounterRepository counterRepository, final
ProvenanceEventRepository provenanceRepository,
- final StateManager stateManager, final
long maxAppendableClaimBytes) {
- super(connectable, connectionIndex, contentRepository,
flowFileRepository, flowFileEventRepository, counterRepository,
provenanceRepository, stateManager);
+ public StandardRepositoryContext(
+ final Connectable connectable,
+ final AtomicLong connectionIndex,
+ final ContentRepository contentRepository,
+ final FlowFileRepository flowFileRepository,
+ final FlowFileEventRepository flowFileEventRepository,
+ final CounterRepository counterRepository,
+ final ComponentMetricReporter componentMetricReporter,
+ final ProvenanceEventRepository provenanceRepository,
+ final StateManager stateManager,
+ final long maxAppendableClaimBytes
+ ) {
+ super(connectable, connectionIndex, contentRepository,
flowFileRepository, flowFileEventRepository, counterRepository,
componentMetricReporter, provenanceRepository, stateManager);
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
index a640ca72e5..209b24c63f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/RepositoryContextFactory.java
@@ -20,6 +20,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -36,19 +37,26 @@ public class RepositoryContextFactory {
private final FlowFileRepository flowFileRepo;
private final FlowFileEventRepository flowFileEventRepo;
private final CounterRepository counterRepo;
+ private final ComponentMetricReporter componentMetricReporter;
private final ProvenanceRepository provenanceRepo;
private final StateManagerProvider stateManagerProvider;
private final long maxAppendableClaimBytes;
- public RepositoryContextFactory(final ContentRepository contentRepository,
final FlowFileRepository flowFileRepository,
- final FlowFileEventRepository flowFileEventRepository, final
CounterRepository counterRepository,
- final ProvenanceRepository provenanceRepository, final
StateManagerProvider stateManagerProvider,
- final long maxAppendableClaimBytes) {
-
+ public RepositoryContextFactory(
+ final ContentRepository contentRepository,
+ final FlowFileRepository flowFileRepository,
+ final FlowFileEventRepository flowFileEventRepository,
+ final CounterRepository counterRepository,
+ final ComponentMetricReporter componentMetricReporter,
+ final ProvenanceRepository provenanceRepository,
+ final StateManagerProvider stateManagerProvider,
+ final long maxAppendableClaimBytes
+ ) {
this.contentRepo = contentRepository;
this.flowFileRepo = flowFileRepository;
this.flowFileEventRepo = flowFileEventRepository;
this.counterRepo = counterRepository;
+ this.componentMetricReporter = componentMetricReporter;
this.provenanceRepo = provenanceRepository;
this.stateManagerProvider = stateManagerProvider;
this.maxAppendableClaimBytes = maxAppendableClaimBytes;
@@ -59,7 +67,18 @@ public class RepositoryContextFactory {
? ((ProcessorNode) connectable).getProcessor().getClass()
: null;
final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier(),
componentClass);
- return new StandardRepositoryContext(connectable, connectionIndex,
contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo,
stateManager, maxAppendableClaimBytes);
+ return new StandardRepositoryContext(
+ connectable,
+ connectionIndex,
+ contentRepo,
+ flowFileRepo,
+ flowFileEventRepo,
+ counterRepo,
+ componentMetricReporter,
+ provenanceRepo,
+ stateManager,
+ maxAppendableClaimBytes
+ );
}
public ContentRepository getContentRepository() {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
index 51071e5c3b..88d760b341 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
@@ -34,6 +34,10 @@ import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
+import
org.apache.nifi.controller.metrics.ComponentMetricReporterConfigurationContext;
+import org.apache.nifi.controller.metrics.DefaultComponentMetricReporter;
+import
org.apache.nifi.controller.metrics.StandardComponentMetricReporterConfigurationContext;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import
org.apache.nifi.controller.status.history.JsonNodeStatusHistoryDumpFactory;
import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
@@ -88,6 +92,8 @@ public class FlowControllerConfiguration {
private static final String FLOW_ACTION_REPORTER_IMPLEMENTATION =
"nifi.flow.action.reporter.implementation";
+ private static final String COMPONENT_METRIC_REPORTER_IMPLEMENTATION =
"nifi.component.metric.reporter.implementation";
+
private NiFiProperties properties;
private ExtensionDiscoveringManager extensionManager;
@@ -211,6 +217,7 @@ public class FlowControllerConfiguration {
properties,
authorizer,
auditService,
+ componentMetricReporter(),
propertyEncryptor(),
bulletinRepository,
extensionManager,
@@ -225,6 +232,7 @@ public class FlowControllerConfiguration {
properties,
authorizer,
auditService,
+ componentMetricReporter(),
propertyEncryptor(),
nodeProtocolSender,
bulletinRepository,
@@ -501,4 +509,29 @@ public class FlowControllerConfiguration {
return flowActionReporter;
}
+
+ /**
+ * Component Metric Reporter configured from NiFi Application Properties
+ *
+ * @return Component Metric Reporter
+ */
+ @Bean
+ public ComponentMetricReporter componentMetricReporter() {
+ final ComponentMetricReporter componentMetricReporter;
+
+ final String configuredClassName =
properties.getProperty(COMPONENT_METRIC_REPORTER_IMPLEMENTATION);
+ if (configuredClassName == null || configuredClassName.isBlank()) {
+ componentMetricReporter = new DefaultComponentMetricReporter();
+ } else {
+ try {
+ componentMetricReporter =
NarThreadContextClassLoader.createInstance(extensionManager,
configuredClassName, ComponentMetricReporter.class, properties);
+ final ComponentMetricReporterConfigurationContext
configurationContext = new
StandardComponentMetricReporterConfigurationContext(sslContext, trustManager);
+ componentMetricReporter.onConfigured(configurationContext);
+ } catch (final Exception e) {
+ throw new IllegalStateException("Failed to create
ComponentMetricReporter with class [%s]".formatted(configuredClassName), e);
+ }
+ }
+
+ return componentMetricReporter;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 3d616e4332..4df09a7aed 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -28,6 +28,7 @@ import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardProcessorNode;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
@@ -133,6 +134,7 @@ public class StandardProcessSessionIT {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository;
+ private ComponentMetricReporter componentMetricReporter;
private FlowFileEventRepository flowFileEventRepository;
private ResourceClaimManager resourceClaimManager;
@@ -177,6 +179,7 @@ public class StandardProcessSessionIT {
flowFileEventRepository = new RingBufferEventRepository(1);
counterRepository = new StandardCounterRepository();
provenanceRepo = new MockProvenanceRepository();
+ componentMetricReporter = mock(ComponentMetricReporter.class);
final Connection connection = createConnection();
@@ -185,6 +188,7 @@ public class StandardProcessSessionIT {
final ProcessGroup procGroup = Mockito.mock(ProcessGroup.class);
when(procGroup.getIdentifier()).thenReturn("proc-group-identifier-1");
+ when(procGroup.getLoggingAttributes()).thenReturn(Map.of());
connectable = Mockito.mock(Connectable.class);
when(connectable.hasIncomingConnection()).thenReturn(true);
@@ -218,7 +222,7 @@ public class StandardProcessSessionIT {
stateManager.setIgnoreAnnotations(true);
context = new StandardRepositoryContext(connectable, new
AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository,
- counterRepository, provenanceRepo, stateManager, 50_000L);
+ counterRepository, componentMetricReporter, provenanceRepo,
stateManager, 50_000L);
session = new StandardProcessSession(context, () -> false, new
NopPerformanceTracker());
}
@@ -2978,6 +2982,11 @@ public class StandardProcessSessionIT {
}).when(connectable).getConnections(Mockito.any(Relationship.class));
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
+
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+ when(processGroup.getLoggingAttributes()).thenReturn(Map.of());
+ when(connectable.getProcessGroup()).thenReturn(processGroup);
+
return connectable;
}
@@ -2989,6 +2998,7 @@ public class StandardProcessSessionIT {
flowFileRepo,
flowFileEventRepository,
counterRepository,
+ componentMetricReporter,
provenanceRepo,
stateManager,
50_000L);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
index 6e9a6293d2..5e2534adf2 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
@@ -32,6 +32,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -134,6 +135,7 @@ public class StandardExtensionDiscoveringManager implements
ExtensionDiscovering
definitionMap.put(NarPersistenceProvider.class, new HashSet<>());
definitionMap.put(AssetManager.class, new HashSet<>());
definitionMap.put(FlowActionReporter.class, new HashSet<>());
+ definitionMap.put(ComponentMetricReporter.class, new HashSet<>());
additionalExtensionTypes.forEach(type ->
definitionMap.putIfAbsent(type, new HashSet<>()));
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index 9a8230f464..a2936c840f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -34,6 +34,8 @@ import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
+import org.apache.nifi.controller.metrics.DefaultComponentMetricReporter;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
@@ -134,6 +136,7 @@ public class HeadlessNiFiServer implements NiFiServer {
final FrameworkSslContextProvider sslContextProvider = new
FrameworkSslContextProvider(props);
final SSLContext sslContext =
sslContextProvider.loadSslContext().orElse(null);
final StateManagerProvider stateManagerProvider =
StandardStateManagerProvider.create(props, sslContext, extensionManager,
ParameterLookup.EMPTY);
+ final ComponentMetricReporter componentMetricReporter = new
DefaultComponentMetricReporter();
flowController = FlowController.createStandaloneInstance(
flowFileEventRepository,
@@ -141,6 +144,7 @@ public class HeadlessNiFiServer implements NiFiServer {
props,
authorizer,
auditService,
+ componentMetricReporter,
encryptor,
bulletinRepository,
extensionManager,
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 694d8102dd..d3ceb16689 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -32,6 +32,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.state.MockStateManager;
import org.junit.jupiter.api.Assertions;
@@ -77,6 +78,7 @@ public class MockProcessSession implements ProcessSession {
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
private final SharedSessionState sharedState;
private final Map<String, Long> counterMap = new HashMap<>();
+ private final Map<String, List<Double>> namedGaugeValues = new HashMap<>();
private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();
private final Set<FlowFile> writeRecursionSet = new HashSet<>();
private final MockProvenanceReporter provenanceReporter;
@@ -154,6 +156,19 @@ public class MockProcessSession implements ProcessSession {
counterMap.put(name, counter);
}
+ @Override
+ public void recordGauge(final String name, final double value, final
CommitTiming commitTiming) {
+ if (CommitTiming.NOW == commitTiming) {
+ sharedState.recordGauge(name, value);
+ } else {
+ namedGaugeValues.compute(name, (gaugeName, values) -> {
+ final List<Double> gaugeValues =
Objects.requireNonNullElseGet(values, ArrayList::new);
+ gaugeValues.add(value);
+ return gaugeValues;
+ });
+ }
+ }
+
@Override
public void migrate(final ProcessSession newOwner) {
migrate(newOwner, new ArrayList<>((Collection)
currentVersions.values()));
@@ -327,6 +342,14 @@ public class MockProcessSession implements ProcessSession {
sharedState.adjustCounter(entry.getKey(), entry.getValue());
}
+ for (final Map.Entry<String, List<Double>> namedGaugeEntry :
namedGaugeValues.entrySet()) {
+ final String name = namedGaugeEntry.getKey();
+ final List<Double> gaugeValues = namedGaugeEntry.getValue();
+ for (final Double gaugeValue : gaugeValues) {
+ sharedState.recordGauge(name, gaugeValue);
+ }
+ }
+
sharedState.addProvenanceEvents(provenanceReporter.getEvents());
provenanceReporter.clear();
counterMap.clear();
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
index e28390a2f7..1b65aa8b02 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@ -23,6 +23,7 @@ import org.apache.nifi.provenance.ProvenanceReporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -36,6 +37,7 @@ public class SharedSessionState {
private final Processor processor;
private final AtomicLong flowFileIdGenerator;
private final ConcurrentMap<String, AtomicLong> counterMap = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<String, List<Double>> namedGaugeValues = new
ConcurrentHashMap<>();
// list of provenance events as they were in the provenance repository
(events emitted with force=true or committed with the session)
private final List<ProvenanceEventRecord> events = new ArrayList<>();
@@ -87,4 +89,16 @@ public class SharedSessionState {
final AtomicLong counterValue = counterMap.get(name);
return counterValue == null ? null : counterValue.get();
}
+
+ public void recordGauge(final String name, final double value) {
+ namedGaugeValues.compute(name, (gaugeName, values) -> {
+ final List<Double> gaugeValues =
Objects.requireNonNullElseGet(values, ArrayList::new);
+ gaugeValues.add(value);
+ return gaugeValues;
+ });
+ }
+
+ public List<Double> getGaugeValues(final String name) {
+ return namedGaugeValues.getOrDefault(name, List.of());
+ }
}
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 433c52172c..3e62d0ef65 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -580,6 +580,11 @@ public class StandardProcessorTestRunner implements
TestRunner {
return sharedState.getCounterValue(name);
}
+ @Override
+ public List<Double> getGaugeValues(final String name) {
+ return sharedState.getGaugeValues(name);
+ }
+
@Override
public int getRemovedCount() {
int count = 0;
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index c975f90add..93c51f841e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -562,6 +562,14 @@ public interface TestRunner {
*/
Long getCounterValue(String name);
+ /**
+ * Get list of values recorded for the named Gauge
+ *
+ * @param name Gauge Name
+ * @return List of recorded values or empty when the named Gauge was not
used
+ */
+ List<Double> getGaugeValues(String name);
+
/**
* @return the number of FlowFiles that have been removed from the system
*/
diff --git
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
index 2befa6a280..bd838330a4 100644
---
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
+++
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
@@ -29,12 +29,11 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.metrics.CommitTiming;
import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -49,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.fail;
public class TestStandardProcessorTestRunner {
+ private static final String NAMED_GAUGE = "Processing Time";
+ private static final double NAMED_GAUGE_VALUE = 120.35;
+
@Test
public void testProcessContextPassedToOnStoppedMethods() {
final ProcessorWithOnStop proc = new ProcessorWithOnStop();
@@ -107,6 +109,19 @@ public class TestStandardProcessorTestRunner {
runner.assertAllConditionsMet("success", either);
}
+ @Test
+ public void testRecordGauge() {
+ final RecordGaugeProcessor processor = new RecordGaugeProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+
+ runner.run();
+
+ final List<Double> gaugeValues = runner.getGaugeValues(NAMED_GAUGE);
+ assertFalse(gaugeValues.isEmpty());
+ final Double firstValue = gaugeValues.getFirst();
+ assertEquals(NAMED_GAUGE_VALUE, firstValue);
+ }
+
@Test
public void testNumThreads() {
final ProcessorWithOnStop proc = new ProcessorWithOnStop();
@@ -157,8 +172,7 @@ public class TestStandardProcessorTestRunner {
@Test
public void testControllerServiceUpdateShouldCallOnSetProperty() {
- // Arrange
- final ControllerService testService = new SimpleTestService();
+ final SimpleTestService testService = new SimpleTestService();
final AddAttributeProcessor proc = new AddAttributeProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final String serviceIdentifier = "test";
@@ -170,12 +184,10 @@ public class TestStandardProcessorTestRunner {
fail(e.getMessage());
}
- assertFalse(((SimpleTestService) testService).isOpmCalled(),
"onPropertyModified has been called");
+ assertFalse(testService.isOpmCalled(), "onPropertyModified has been
called");
- // Act
ValidationResult vr = runner.setProperty(testService, pdName, pdValue);
- // Assert
assertTrue(vr.isValid());
ControllerServiceConfiguration csConf = ((MockProcessContext)
runner.getProcessContext()).getConfiguration(serviceIdentifier);
@@ -183,7 +195,7 @@ public class TestStandardProcessorTestRunner {
String retrievedPDValue =
csConf.getProperties().get(propertyDescriptor);
assertEquals(pdValue, retrievedPDValue);
- assertTrue(((SimpleTestService) testService).isOpmCalled(),
"onPropertyModified has not been called");
+ assertTrue(testService.isOpmCalled(), "onPropertyModified has not been
called");
}
@Test
@@ -221,6 +233,13 @@ public class TestStandardProcessorTestRunner {
runner.assertValid();
}
+ private static class RecordGaugeProcessor extends AbstractProcessor {
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) {
+ session.recordGauge(NAMED_GAUGE, NAMED_GAUGE_VALUE,
CommitTiming.NOW);
+ }
+ }
+
private static class ProcessorWithOnStop extends AbstractProcessor {
private int callsWithContext = 0;
@@ -260,10 +279,7 @@ public class TestStandardProcessorTestRunner {
@Override
protected void init(final ProcessorInitializationContext context) {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
+ this.relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
@Override
@@ -299,10 +315,7 @@ public class TestStandardProcessorTestRunner {
private final Set<Relationship> relationships;
public GoodProcessor() {
- final Set<Relationship> r = new HashSet<>();
- r.add(REL_SUCCESS);
- r.add(REL_FAILURE);
- relationships = Collections.unmodifiableSet(r);
+ relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
@Override
@@ -321,7 +334,7 @@ public class TestStandardProcessorTestRunner {
private static class SimpleTestService extends AbstractControllerService {
private final String PD_NAME = "name";
- private PropertyDescriptor namePropertyDescriptor = new
PropertyDescriptor.Builder()
+ private final PropertyDescriptor namePropertyDescriptor = new
PropertyDescriptor.Builder()
.name(PD_NAME)
.displayName("Controller Service Name")
.required(false)
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 84b5051f4e..1aefe1db79 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -24,6 +24,8 @@ import org.apache.nifi.asset.StandardAssetManager;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
+import org.apache.nifi.controller.metrics.DefaultComponentMetricReporter;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.ContentRepositoryContext;
import org.apache.nifi.controller.repository.CounterRepository;
@@ -207,6 +209,7 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
};
final CounterRepository counterRepo = new
StandardCounterRepository();
+ final ComponentMetricReporter componentMetricReporter = new
DefaultComponentMetricReporter();
final File krb5File = engineConfiguration.getKrb5File();
final KerberosConfig kerberosConfig = new KerberosConfig(null,
null, krb5File);
@@ -244,7 +247,7 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
flowFileRepo = new StatelessFlowFileRepository();
final RepositoryContextFactory repositoryContextFactory = new
StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
- counterRepo, stateManagerProvider);
+ counterRepo, componentMetricReporter, stateManagerProvider);
final StatelessEngineInitializationContext
statelessEngineInitializationContext = new
StatelessEngineInitializationContext(controllerServiceProvider, flowManager,
processContextFactory,
repositoryContextFactory);
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
index a354ddec95..f201c93535 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.stateless.repository;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.repository.AbstractRepositoryContext;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
@@ -34,10 +35,18 @@ import java.util.concurrent.atomic.AtomicLong;
public class StatelessRepositoryContext extends AbstractRepositoryContext
implements RepositoryContext {
private final ContentRepository contentRepository;
- public StatelessRepositoryContext(final Connectable connectable, final
AtomicLong connectionIndex, final ContentRepository contentRepository, final
FlowFileRepository flowFileRepository,
- final FlowFileEventRepository
flowFileEventRepository, final CounterRepository counterRepository, final
ProvenanceEventRepository provenanceRepository,
- final StateManager stateManager) {
- super(connectable, connectionIndex, contentRepository,
flowFileRepository, flowFileEventRepository, counterRepository,
provenanceRepository, stateManager);
+ public StatelessRepositoryContext(
+ final Connectable connectable,
+ final AtomicLong connectionIndex,
+ final ContentRepository contentRepository,
+ final FlowFileRepository flowFileRepository,
+ final FlowFileEventRepository flowFileEventRepository,
+ final CounterRepository counterRepository,
+ final ComponentMetricReporter componentMetricReporter,
+ final ProvenanceEventRepository provenanceRepository,
+ final StateManager stateManager
+ ) {
+ super(connectable, connectionIndex, contentRepository,
flowFileRepository, flowFileEventRepository, counterRepository,
componentMetricReporter, provenanceRepository, stateManager);
this.contentRepository = contentRepository;
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
index 2a3e135d0e..c276288fab 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContextFactory.java
@@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.metrics.ComponentMetricReporter;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -40,14 +41,22 @@ public class StatelessRepositoryContextFactory implements
RepositoryContextFacto
private final FlowFileRepository flowFileRepository;
private final FlowFileEventRepository flowFileEventRepository;
private final CounterRepository counterRepository;
+ private final ComponentMetricReporter componentMetricReporter;
private final StateManagerProvider stateManagerProvider;
- public StatelessRepositoryContextFactory(final ContentRepository
contentRepository, final FlowFileRepository flowFileRepository, final
FlowFileEventRepository flowFileEventRepository,
- final CounterRepository
counterRepository, final StateManagerProvider stateManagerProvider) {
+ public StatelessRepositoryContextFactory(
+ final ContentRepository contentRepository,
+ final FlowFileRepository flowFileRepository,
+ final FlowFileEventRepository flowFileEventRepository,
+ final CounterRepository counterRepository,
+ final ComponentMetricReporter componentMetricReporter,
+ final StateManagerProvider stateManagerProvider
+ ) {
this.contentRepository = contentRepository;
this.flowFileRepository = flowFileRepository;
this.flowFileEventRepository = flowFileEventRepository;
this.counterRepository = counterRepository;
+ this.componentMetricReporter = componentMetricReporter;
this.stateManagerProvider = stateManagerProvider;
}
@@ -58,7 +67,7 @@ public class StatelessRepositoryContextFactory implements
RepositoryContextFacto
: null;
final StateManager stateManager =
stateManagerProvider.getStateManager(connectable.getIdentifier(),
componentClass);
return new StatelessRepositoryContext(connectable, new AtomicLong(0L),
contentRepository, flowFileRepository,
- flowFileEventRepository, counterRepository,
provenanceEventRepository, stateManager);
+ flowFileEventRepository, counterRepository,
componentMetricReporter, provenanceEventRepository, stateManager);
}
@Override
diff --git a/nifi-system-tests/pom.xml
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter-nar/pom.xml
similarity index 63%
copy from nifi-system-tests/pom.xml
copy to
nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter-nar/pom.xml
index a243ec5a8b..7d22e36b60 100644
--- a/nifi-system-tests/pom.xml
+++
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter-nar/pom.xml
@@ -16,24 +16,19 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>nifi</artifactId>
<groupId>org.apache.nifi</groupId>
+
<artifactId>nifi-system-test-component-metric-reporter-bundle</artifactId>
<version>2.8.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-system-tests</artifactId>
- <packaging>pom</packaging>
-
- <modules>
- <module>nifi-system-test-authorizer-bundle</module>
- <module>nifi-system-test-extensions-bundle</module>
- <module>nifi-system-test-extensions2-bundle</module>
- <module>nifi-system-test-flow-action-reporter-bundle</module>
- <module>nifi-alternate-config-extensions-bundle</module>
- <module>nifi-system-test-nar-provider-bundles</module>
- <module>nifi-python-test-extensions-nar</module>
- <module>nifi-system-test-suite</module>
- <module>nifi-stateless-system-test-suite</module>
- </modules>
+ <artifactId>nifi-system-test-component-metric-reporter-nar</artifactId>
+ <packaging>nar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-system-test-component-metric-reporter</artifactId>
+ <version>2.8.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
</project>
diff --git a/nifi-system-tests/pom.xml
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/pom.xml
similarity index 63%
copy from nifi-system-tests/pom.xml
copy to
nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/pom.xml
index a243ec5a8b..80cdc0ead8 100644
--- a/nifi-system-tests/pom.xml
+++
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/pom.xml
@@ -16,24 +16,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>nifi</artifactId>
<groupId>org.apache.nifi</groupId>
+
<artifactId>nifi-system-test-component-metric-reporter-bundle</artifactId>
<version>2.8.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-system-tests</artifactId>
- <packaging>pom</packaging>
-
- <modules>
- <module>nifi-system-test-authorizer-bundle</module>
- <module>nifi-system-test-extensions-bundle</module>
- <module>nifi-system-test-extensions2-bundle</module>
- <module>nifi-system-test-flow-action-reporter-bundle</module>
- <module>nifi-alternate-config-extensions-bundle</module>
- <module>nifi-system-test-nar-provider-bundles</module>
- <module>nifi-python-test-extensions-nar</module>
- <module>nifi-system-test-suite</module>
- <module>nifi-stateless-system-test-suite</module>
- </modules>
+ <artifactId>nifi-system-test-component-metric-reporter</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-framework-api</artifactId>
+ <version>2.8.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/src/main/java/org/apache/nifi/controller/metrics/SystemTestComponentMetricReporter.java
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/src/main/java/org/apache/nifi/controller/metrics/SystemTestComponentMetricReporter.java
new file mode 100644
index 0000000000..a0cd473c16
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/src/main/java/org/apache/nifi/controller/metrics/SystemTestComponentMetricReporter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+/**
+ * System Test implementation of Component Metric Reporter
+ */
+public class SystemTestComponentMetricReporter implements
ComponentMetricReporter {
+ private static final String USER_DIR_PROPERTY = "user.dir";
+
+ private static final Path USER_DIRECTORY =
Paths.get(System.getProperty(USER_DIR_PROPERTY));
+
+ private static final String LOG_FILE_PREFIX =
SystemTestComponentMetricReporter.class.getName();
+
+ private static final Logger logger =
LoggerFactory.getLogger(SystemTestComponentMetricReporter.class);
+
+ @Override
+ public void recordGauge(final GaugeRecord gaugeRecord) {
+ logger.info("Recording Gauge [{}] Value [{}]", gaugeRecord.name(),
gaugeRecord.value());
+
+ final ComponentMetricContext componentMetricContext =
gaugeRecord.componentMetricContext();
+ final String formatted = "Gauge [%s] Value [%s] Component ID
[%s]".formatted(gaugeRecord.name(), gaugeRecord.value(),
componentMetricContext.id());
+ final String filename =
"%s.GaugeRecord.%d.log".formatted(LOG_FILE_PREFIX, System.nanoTime());
+ final Path log = USER_DIRECTORY.resolve(filename);
+
+ try {
+ Files.writeString(log, formatted);
+ } catch (final Exception e) {
+ logger.warn("Failed to write Gauge Record [{}]", log, e);
+ }
+ }
+
+ @Override
+ public void recordCounter(final CounterRecord counterRecord) {
+ logger.info("Recording Counter [{}] Value [{}]", counterRecord.name(),
counterRecord.value());
+
+ final ComponentMetricContext componentMetricContext =
counterRecord.componentMetricContext();
+ final String formatted = "Counter [%s] Value [%s] Component ID
[%s]".formatted(counterRecord.name(), counterRecord.value(),
componentMetricContext.id());
+ final String filename =
"%s.CounterRecord.%d.log".formatted(LOG_FILE_PREFIX, System.nanoTime());
+ final Path log = USER_DIRECTORY.resolve(filename);
+
+ try {
+ Files.writeString(log, formatted);
+ } catch (final Exception e) {
+ logger.warn("Failed to write Counter Record [{}]", log, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.info("Clearing Component Metrics from User Directory [{}]",
USER_DIRECTORY);
+ try (Stream<Path> userDirectoryPaths = Files.list(USER_DIRECTORY)) {
+ userDirectoryPaths
+ .filter(path ->
path.getFileName().toString().startsWith(LOG_FILE_PREFIX))
+ .forEach(log -> {
+ try {
+ Files.delete(log);
+ } catch (final Exception e) {
+ logger.warn("Delete Component Metrics Log [{}]
failed", log, e);
+ }
+ });
+ } catch (final Exception e) {
+ logger.warn("Failed to clear User Directory [{}]", USER_DIRECTORY,
e);
+ }
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/src/main/resources/META-INF/services/org.apache.nifi.controller.metrics.ComponentMetricReporter
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/src/main/resources/META-INF/services/org.apache.nifi.controller.metrics.ComponentMetricReporter
new file mode 100644
index 0000000000..451db89522
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/nifi-system-test-component-metric-reporter/src/main/resources/META-INF/services/org.apache.nifi.controller.metrics.ComponentMetricReporter
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.controller.metrics.SystemTestComponentMetricReporter
diff --git a/nifi-system-tests/pom.xml
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/pom.xml
similarity index 66%
copy from nifi-system-tests/pom.xml
copy to
nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/pom.xml
index a243ec5a8b..08b8a273f5 100644
--- a/nifi-system-tests/pom.xml
+++
b/nifi-system-tests/nifi-system-test-component-metric-reporter-bundle/pom.xml
@@ -16,24 +16,16 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>nifi</artifactId>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-system-tests</artifactId>
<version>2.8.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-system-tests</artifactId>
+ <artifactId>nifi-system-test-component-metric-reporter-bundle</artifactId>
<packaging>pom</packaging>
<modules>
- <module>nifi-system-test-authorizer-bundle</module>
- <module>nifi-system-test-extensions-bundle</module>
- <module>nifi-system-test-extensions2-bundle</module>
- <module>nifi-system-test-flow-action-reporter-bundle</module>
- <module>nifi-alternate-config-extensions-bundle</module>
- <module>nifi-system-test-nar-provider-bundles</module>
- <module>nifi-python-test-extensions-nar</module>
- <module>nifi-system-test-suite</module>
- <module>nifi-stateless-system-test-suite</module>
+ <module>nifi-system-test-component-metric-reporter</module>
+ <module>nifi-system-test-component-metric-reporter-nar</module>
</modules>
-
</project>
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateMetric.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateMetric.java
new file mode 100644
index 0000000000..93201461a9
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateMetric.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.metrics.CommitTiming;
+
+import java.util.Set;
+
+public class UpdateMetric extends AbstractProcessor {
+
+ private static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .autoTerminateDefault(true)
+ .build();
+
+ private static final Set<Relationship> RELATIONSHIPS = Set.of(SUCCESS);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ session.adjustCounter("onTrigger", 1, true);
+
+ final Runtime runtime = Runtime.getRuntime();
+ final long freeMemory = runtime.freeMemory();
+ session.recordGauge("freeMemory", freeMemory, CommitTiming.NOW);
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ session.transfer(flowFile, SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d0a01b38b5..a12f954cb8 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -54,6 +54,7 @@ org.apache.nifi.processors.tests.system.TransferBatch
org.apache.nifi.processors.tests.system.ThrowExceptionInFlowFileFilter
org.apache.nifi.processors.tests.system.ThrowProcessException
org.apache.nifi.processors.tests.system.UpdateContent
+org.apache.nifi.processors.tests.system.UpdateMetric
org.apache.nifi.processors.tests.system.UnzipFlowFile
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents
diff --git a/nifi-system-tests/nifi-system-test-suite/pom.xml
b/nifi-system-tests/nifi-system-test-suite/pom.xml
index 942145b745..9f10974428 100644
--- a/nifi-system-tests/nifi-system-test-suite/pom.xml
+++ b/nifi-system-tests/nifi-system-test-suite/pom.xml
@@ -366,6 +366,12 @@
<version>2.8.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+
<artifactId>nifi-system-test-component-metric-reporter-nar</artifactId>
+ <version>2.8.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-provider-assembly</artifactId>
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
b/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
index ddcc94c7df..a6f3c31efa 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/assembly/dependencies.xml
@@ -69,6 +69,7 @@
<include>*:nifi-system-test-extensions-services-api-nar</include>
<include>*:nifi-system-test-extensions2-nar</include>
<include>*:nifi-system-test-flow-action-reporter-nar</include>
+
<include>*:nifi-system-test-component-metric-reporter-nar</include>
<include>*:nifi-jetty-nar</include>
<include>*:nifi-framework-nar</include>
<include>*:nifi-framework-zookeeper-nar</include>
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/metrics/ComponentMetricReporterIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/metrics/ComponentMetricReporterIT.java
new file mode 100644
index 0000000000..fa13bbbeac
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/metrics/ComponentMetricReporterIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.metrics;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ComponentMetricReporterIT extends NiFiSystemIT {
+
+ private static final String REPORTER_IMPLEMENTATION =
"nifi.component.metric.reporter.implementation";
+
+ private static final String REPORTER_CLASS =
"org.apache.nifi.controller.metrics.SystemTestComponentMetricReporter";
+
+ private static final String GAUGE_RECORD_LOG =
"%s.GaugeRecord".formatted(REPORTER_CLASS);
+
+ private static final String COUNTER_RECORD_LOG =
"%s.CounterRecord".formatted(REPORTER_CLASS);
+
+ @Override
+ protected Map<String, String> getNifiPropertiesOverrides() {
+ return Map.of(REPORTER_IMPLEMENTATION, REPORTER_CLASS);
+ }
+
+ @Test
+ void testUpdateMetricReported() throws NiFiClientException, IOException,
InterruptedException {
+ final ProcessorEntity updateMetric =
getClientUtil().createProcessor("UpdateMetric");
+ getNifiClient().getProcessorClient().runProcessorOnce(updateMetric);
+ getClientUtil().waitForStoppedProcessor(updateMetric.getId());
+
+ final String componentId = updateMetric.getId();
+ assertLogComponentIdFound(GAUGE_RECORD_LOG, componentId);
+ assertLogComponentIdFound(COUNTER_RECORD_LOG, componentId);
+ }
+
+ private Optional<Path> findReportedLog(final String fileNameSearch) throws
IOException {
+ final Path instanceDirectory =
getNiFiInstance().getInstanceDirectory().toPath();
+ try (Stream<Path> userDirectoryPaths = Files.list(instanceDirectory)) {
+ return userDirectoryPaths.filter(path ->
path.getFileName().toString().startsWith(fileNameSearch)).findFirst();
+ }
+ }
+
+ private void assertLogComponentIdFound(final String fileNameSearch, final
String componentId) throws IOException {
+ final Optional<Path> reportedLogFound =
findReportedLog(fileNameSearch);
+ assertTrue(reportedLogFound.isPresent(), "Component Metric Reporter
[%s] log not found".formatted(fileNameSearch));
+
+ final Path reportedLog = reportedLogFound.get();
+ final String log = Files.readString(reportedLog);
+
+ assertTrue(log.contains(componentId), "Update Metric ID [%s] not found
in log [%s]".formatted(componentId, log));
+ }
+}
diff --git a/nifi-system-tests/pom.xml b/nifi-system-tests/pom.xml
index a243ec5a8b..3c58537dc8 100644
--- a/nifi-system-tests/pom.xml
+++ b/nifi-system-tests/pom.xml
@@ -26,6 +26,7 @@
<modules>
<module>nifi-system-test-authorizer-bundle</module>
+ <module>nifi-system-test-component-metric-reporter-bundle</module>
<module>nifi-system-test-extensions-bundle</module>
<module>nifi-system-test-extensions2-bundle</module>
<module>nifi-system-test-flow-action-reporter-bundle</module>