This is an automated email from the ASF dual-hosted git repository.

ferdei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 83b701a25e NIFI-13635 Expose processor bulletins as a part of FlowInfo
83b701a25e is described below

commit 83b701a25ea2820d20768ca4946f5698249beb0a
Author: Peter Kedvessy <[email protected]>
AuthorDate: Tue Aug 6 23:37:56 2024 +0200

    NIFI-13635 Expose processor bulletins as a part of FlowInfo
    
    Signed-off-by: Ferenc Erdei <[email protected]>
    This closes #9161.
---
 .../org/apache/nifi/c2/client/C2ClientConfig.java  |  12 ++
 .../nifi/c2/client/service/C2HeartbeatFactory.java |   7 +-
 .../client/service/model/RuntimeInfoWrapper.java   |  10 +-
 .../c2/client/service/C2HeartbeatFactoryTest.java  |  15 ++-
 .../DescribeManifestOperationHandlerTest.java      |   2 +-
 .../org/apache/nifi/c2/protocol/api/FlowInfo.java  |  11 ++
 .../nifi/c2/protocol/api/ProcessorBulletin.java    | 148 +++++++++++++++++++++
 .../nifi/minifi/commons/api/MiNiFiProperties.java  |   1 +
 .../apache/nifi/minifi/c2/C2NifiClientService.java |  54 +++++++-
 .../src/main/resources/conf/bootstrap.conf         |   3 +-
 10 files changed, 250 insertions(+), 13 deletions(-)

diff --git 
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
 
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index d9cd3aa90f..070cc02d16 100644
--- 
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++ 
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -58,6 +58,7 @@ public class C2ClientConfig {
     private final String c2RequestCompression;
     private final String c2AssetDirectory;
     private final long bootstrapAcknowledgeTimeout;
+    private final int c2FlowInfoProcessorBulletinLimit;
 
     private C2ClientConfig(final Builder builder) {
         this.c2Url = builder.c2Url;
@@ -88,6 +89,7 @@ public class C2ClientConfig {
         this.c2RequestCompression = builder.c2RequestCompression;
         this.c2AssetDirectory = builder.c2AssetDirectory;
         this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
+        this.c2FlowInfoProcessorBulletinLimit = 
builder.c2FlowInfoProcessorBulletinLimit;
     }
 
     public String getC2Url() {
@@ -202,6 +204,10 @@ public class C2ClientConfig {
         return bootstrapAcknowledgeTimeout;
     }
 
+    public int getC2FlowInfoProcessorBulletinLimit() {
+        return c2FlowInfoProcessorBulletinLimit;
+    }
+
     /**
      * Builder for client configuration.
      */
@@ -238,6 +244,7 @@ public class C2ClientConfig {
         private String c2RequestCompression;
         private String c2AssetDirectory;
         private long bootstrapAcknowledgeTimeout;
+        private int c2FlowInfoProcessorBulletinLimit;
 
         public Builder c2Url(String c2Url) {
             this.c2Url = c2Url;
@@ -389,6 +396,11 @@ public class C2ClientConfig {
             return this;
         }
 
+        public Builder c2FlowInfoProcessorBulletinLimit(int 
c2FlowInfoProcessorBulletinLimit) {
+            this.c2FlowInfoProcessorBulletinLimit = 
c2FlowInfoProcessorBulletinLimit;
+            return this;
+        }
+
         public C2ClientConfig build() {
             return new C2ClientConfig(this);
         }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
index 06549b5663..653d9ba48f 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -35,6 +35,7 @@ import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -47,6 +48,7 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
 import org.apache.nifi.c2.protocol.api.AgentStatus;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
 import org.apache.nifi.c2.protocol.api.ResourceInfo;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.DeviceInfo;
@@ -89,7 +91,7 @@ public class C2HeartbeatFactory {
 
         
heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), 
runtimeInfoWrapper.getManifest()));
         heartbeat.setDeviceInfo(generateDeviceInfo());
-        
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus()));
+        heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), 
runtimeInfoWrapper.getProcessorBulletins()));
         heartbeat.setCreated(System.currentTimeMillis());
 
         ResourceInfo resourceInfo = new ResourceInfo();
@@ -99,9 +101,10 @@ public class C2HeartbeatFactory {
         return heartbeat;
     }
 
-    private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus) {
+    private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, 
List<ProcessorBulletin> processorBulletins) {
         FlowInfo flowInfo = new FlowInfo();
         flowInfo.setQueues(queueStatus);
+        flowInfo.setProcessorBulletins(processorBulletins);
         
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
         return flowInfo;
     }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
index c017ac0d11..b7653ea424 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
@@ -16,20 +16,24 @@
  */
 package org.apache.nifi.c2.client.service.model;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
 import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
 
 public class RuntimeInfoWrapper {
     final AgentRepositories repos;
     final RuntimeManifest manifest;
     final Map<String, FlowQueueStatus> queueStatus;
+    final List<ProcessorBulletin> processorBulletins;
 
-    public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest 
manifest, Map<String, FlowQueueStatus> queueStatus) {
+    public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest 
manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> 
processorBulletins) {
         this.repos = repos;
         this.manifest = manifest;
         this.queueStatus = queueStatus;
+        this.processorBulletins = processorBulletins;
     }
 
     public AgentRepositories getAgentRepositories() {
@@ -43,4 +47,8 @@ public class RuntimeInfoWrapper {
     public Map<String, FlowQueueStatus> getQueueStatus() {
         return queueStatus;
     }
+
+    public List<ProcessorBulletin> getProcessorBulletins() {
+        return processorBulletins;
+    }
 }
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
index 4fb20aeb86..ceaa42d4fa 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
@@ -24,9 +24,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
@@ -37,6 +39,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
 import org.apache.nifi.c2.protocol.api.OperationType;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
 import org.apache.nifi.c2.protocol.api.SupportedOperation;
 import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
 import org.apache.nifi.c2.protocol.component.api.Bundle;
@@ -117,12 +120,14 @@ public class C2HeartbeatFactoryTest {
         AgentRepositories repos = new AgentRepositories();
         RuntimeManifest manifest = createManifest();
         Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
+        List<ProcessorBulletin> processorBulletins = new ArrayList<>();
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
 
         assertEquals(repos, 
heartbeat.getAgentInfo().getStatus().getRepositories());
         assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
         assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
+        assertEquals(processorBulletins, 
heartbeat.getFlowInfo().getProcessorBulletins());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
     }
 
@@ -134,12 +139,14 @@ public class C2HeartbeatFactoryTest {
         AgentRepositories repos = new AgentRepositories();
         RuntimeManifest manifest = createManifest();
         Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
+        List<ProcessorBulletin> processorBulletins = new ArrayList<>();
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
 
         assertEquals(repos, 
heartbeat.getAgentInfo().getStatus().getRepositories());
         assertNull(heartbeat.getAgentInfo().getAgentManifest());
         assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
+        assertEquals(processorBulletins, 
heartbeat.getFlowInfo().getProcessorBulletins());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
     }
 
@@ -156,7 +163,7 @@ public class C2HeartbeatFactoryTest {
         when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), 
Collections.emptySet())).thenReturn(MANIFEST_HASH);
         
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new 
ArrayList<>()));
 
         assertEquals(MANIFEST_HASH, 
heartbeat.getAgentInfo().getAgentManifestHash());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
@@ -172,7 +179,7 @@ public class C2HeartbeatFactoryTest {
         when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), 
supportedOperations)).thenReturn(MANIFEST_HASH);
         
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
 
-        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new 
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new 
ArrayList<>()));
 
         assertEquals(MANIFEST_HASH, 
heartbeat.getAgentInfo().getAgentManifestHash());
         assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
diff --git 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
index f4508e7fed..24bcc0700a 100644
--- 
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
+++ 
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
@@ -58,7 +58,7 @@ public class DescribeManifestOperationHandlerTest {
     void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
         RuntimeManifest manifest = new RuntimeManifest();
         manifest.setIdentifier("manifestId");
-        RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, 
manifest, null);
+        RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, 
manifest, null, null);
 
         C2Heartbeat heartbeat = new C2Heartbeat();
         AgentInfo agentInfo = new AgentInfo();
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
index b6c9cc93ec..f2f683c1a3 100644
--- 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
@@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
 import io.swagger.v3.oas.annotations.media.Schema;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 public class FlowInfo implements Serializable {
@@ -29,6 +30,7 @@ public class FlowInfo implements Serializable {
     private FlowUri flowUri;
     private Map<String, ComponentStatus> components;
     private Map<String, FlowQueueStatus> queues;
+    private List<ProcessorBulletin> processorBulletins;
 
     @Schema(description = "A unique identifier of the flow currently deployed 
on the agent")
     public String getFlowId() {
@@ -66,4 +68,13 @@ public class FlowInfo implements Serializable {
         this.queues = queues;
     }
 
+    @Schema(description = "Bulletins of each processors")
+    public List<ProcessorBulletin> getProcessorBulletins() {
+        return processorBulletins;
+    }
+
+    public void setProcessorBulletins(List<ProcessorBulletin> 
processorBulletins) {
+        this.processorBulletins = processorBulletins;
+    }
+
 }
diff --git 
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java
 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java
new file mode 100644
index 0000000000..a10ba225d4
--- /dev/null
+++ 
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.protocol.api;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.io.Serializable;
+import java.util.Date;
+
+public class ProcessorBulletin implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Date timestamp;
+    private long id;
+    private String nodeAddress;
+    private String level;
+    private String category;
+    private String message;
+    private String groupId;
+    private String groupName;
+    private String groupPath;
+    private String sourceId;
+    private String sourceName;
+    private String flowFileUuid;
+
+    @Schema(description = "When this bulletin was generated")
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Date timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Schema(description = "The id of the bulletin")
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    @Schema(description = "If clustered, the address of the node from which 
the bulletin originated")
+    public String getNodeAddress() {
+        return nodeAddress;
+    }
+
+    public void setNodeAddress(String nodeAddress) {
+        this.nodeAddress = nodeAddress;
+    }
+
+    @Schema(description = "The level of the bulletin")
+    public String getLevel() {
+        return level;
+    }
+
+    public void setLevel(String level) {
+        this.level = level;
+    }
+
+    @Schema(description = "The category of this bulletin")
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    @Schema(description = "The bulletin message")
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    @Schema(description = "The group id of the source component")
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    @Schema(description = "The group name of the source component")
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    @Schema(description = "The group path of the source component")
+    public String getGroupPath() {
+        return groupPath;
+    }
+
+    public void setGroupPath(String groupPath) {
+        this.groupPath = groupPath;
+    }
+
+    @Schema(description = "The id of the source component")
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    @Schema(description = "The name of the source component")
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    @Schema(description = "The id of the flow file")
+    public String getFlowFileUuid() {
+        return flowFileUuid;
+    }
+
+    public void setFlowFileUuid(String flowFileUuid) {
+        this.flowFileUuid = flowFileUuid;
+    }
+}
diff --git 
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
 
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
index b346eed20b..c40d2be198 100644
--- 
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
+++ 
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
@@ -92,6 +92,7 @@ public enum MiNiFiProperties {
     C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, 
false, VALID),
     C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, 
VALID),
     C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15 
sec", false, true, VALID),
+    
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit", 
"1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ),
     NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, 
false, true, VALID),
     
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path",
 null, false, true, VALID),
     
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds",
 null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 677798976a..02366147d5 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -44,6 +44,7 @@ import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_PATH_H
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_READ_TIMEOUT;
 import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL_ACK;
+import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_TYPE;
 import static 
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KEYSTORE_LOCATION;
@@ -59,11 +60,14 @@ import static 
org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.bootstrap.BootstrapCommunicator;
 import org.apache.nifi.c2.client.C2ClientConfig;
@@ -90,6 +94,7 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
 import org.apache.nifi.c2.serializer.C2JacksonSerializer;
 import org.apache.nifi.c2.serializer.C2Serializer;
 import org.apache.nifi.controller.FlowController;
@@ -113,6 +118,8 @@ import 
org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
 import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
 import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
 import org.apache.nifi.nar.ExtensionManagerHolder;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
@@ -173,8 +180,9 @@ public class C2NifiClientService {
 
         this.c2OperationManager = new C2OperationManager(
             client, c2OperationHandlerProvider, heartbeatLock, 
operationQueueDAO, c2OperationRestartHandler);
+        Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> 
generateRuntimeInfo(clientConfig.getC2FlowInfoProcessorBulletinLimit());
         this.c2HeartbeatManager = new C2HeartbeatManager(
-            client, heartbeatFactory, heartbeatLock, 
this::generateRuntimeInfo, c2OperationManager);
+            client, heartbeatFactory, heartbeatLock, 
runtimeInfoWrapperSupplier, c2OperationManager);
     }
 
     private C2ClientConfig generateClientConfig(NiFiProperties properties) {
@@ -206,6 +214,8 @@ public class C2NifiClientService {
             
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), 
C2_REST_PATH_HEARTBEAT.getDefaultValue()))
             
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(),
 C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
             
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties, 
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
+            .c2FlowInfoProcessorBulletinLimit(parseInt(properties
+                    
.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(), 
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())))
             .build();
     }
 
@@ -231,10 +241,12 @@ public class C2NifiClientService {
         UpdateConfigurationStrategy updateConfigurationStrategy = new 
DefaultUpdateConfigurationStrategy(flowController, flowService,
             new StandardFlowEnrichService(niFiProperties), 
flowPropertyEncryptor,
             StandardFlowSerDeService.defaultInstance(), 
niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
+        Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () -> 
generateRuntimeInfo(
+                
parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(),
 C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())));
 
         return new C2OperationHandlerProvider(List.of(
             new UpdateConfigurationOperationHandler(client, flowIdHolder, 
updateConfigurationStrategy, emptyOperandPropertiesProvider),
-            new DescribeManifestOperationHandler(heartbeatFactory, 
this::generateRuntimeInfo, emptyOperandPropertiesProvider),
+            new DescribeManifestOperationHandler(heartbeatFactory, 
runtimeInfoWrapperSupplier, emptyOperandPropertiesProvider),
             TransferDebugOperationHandler.create(client, 
emptyOperandPropertiesProvider,
                 transferDebugCommandHelper.debugBundleFiles(), 
transferDebugCommandHelper::excludeSensitiveText),
             UpdateAssetOperationHandler.create(client, 
emptyOperandPropertiesProvider,
@@ -271,10 +283,10 @@ public class C2NifiClientService {
         }
     }
 
-    private synchronized RuntimeInfoWrapper generateRuntimeInfo() {
+    private synchronized RuntimeInfoWrapper generateRuntimeInfo(int 
processorBulletinLimit) {
         AgentManifest agentManifest = new 
AgentManifest(runtimeManifestService.getManifest());
         
agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations());
-        return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, 
getQueueStatus());
+        return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest, 
getQueueStatus(), getBulletins(processorBulletinLimit));
     }
 
     private AgentRepositories getAgentRepositories() {
@@ -321,4 +333,38 @@ public class C2NifiClientService {
             })
             .collect(toMap(Pair::getKey, Pair::getValue));
     }
+
+    private List<ProcessorBulletin> getBulletins(int processorBulletinLimit) {
+        if (processorBulletinLimit > 0) {
+            String groupId = flowController.getEventAccess()
+                    .getGroupStatus(ROOT_GROUP_ID)
+                    .getId();
+            BulletinQuery query = new BulletinQuery.Builder()
+                    .sourceType(ComponentType.PROCESSOR)
+                    .groupIdMatches(groupId)
+                    .limit(processorBulletinLimit)
+                    .build();
+
+            return flowController.getBulletinRepository()
+                    .findBulletins(query)
+                    .stream()
+                    .map(bulletin -> {
+                        ProcessorBulletin processorBulletin = new 
ProcessorBulletin();
+                        processorBulletin.setCategory(bulletin.getCategory());
+                        
processorBulletin.setFlowFileUuid(bulletin.getFlowFileUuid());
+                        processorBulletin.setGroupId(bulletin.getGroupId());
+                        
processorBulletin.setGroupName(bulletin.getGroupName());
+                        
processorBulletin.setGroupPath(bulletin.getGroupPath());
+                        processorBulletin.setId(bulletin.getId());
+                        processorBulletin.setLevel(bulletin.getLevel());
+                        processorBulletin.setMessage(bulletin.getMessage());
+                        
processorBulletin.setNodeAddress(bulletin.getNodeAddress());
+                        processorBulletin.setSourceId(bulletin.getSourceId());
+                        
processorBulletin.setSourceName(bulletin.getSourceName());
+                        
processorBulletin.setTimestamp(bulletin.getTimestamp());
+                        return processorBulletin;
+                    }).toList();
+        }
+        return new ArrayList<>();
+    }
 }
\ No newline at end of file
diff --git 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 61afd6f355..761bc7bfc0 100644
--- 
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ 
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -113,7 +113,8 @@ nifi.minifi.sensitive.props.algorithm=
 #c2.security.keystore.password=
 #c2.security.keystore.type=JKS
 #c2.request.compression=none
-
+# Number of processor bulletins exposed as a part of flow info.
+#c2.flow.info.processor.bulletin.limit=0
 
 ### MiNiFi Notifiers For Flow Updates
 # If C2 is enabled these should be disabled

Reply via email to