This is an automated email from the ASF dual-hosted git repository.
ferdei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 83b701a25e NIFI-13635 Expose processor bulletins as a part of FlowInfo
83b701a25e is described below
commit 83b701a25ea2820d20768ca4946f5698249beb0a
Author: Peter Kedvessy <[email protected]>
AuthorDate: Tue Aug 6 23:37:56 2024 +0200
NIFI-13635 Expose processor bulletins as a part of FlowInfo
Signed-off-by: Ferenc Erdei <[email protected]>
This closes #9161.
---
.../org/apache/nifi/c2/client/C2ClientConfig.java | 12 ++
.../nifi/c2/client/service/C2HeartbeatFactory.java | 7 +-
.../client/service/model/RuntimeInfoWrapper.java | 10 +-
.../c2/client/service/C2HeartbeatFactoryTest.java | 15 ++-
.../DescribeManifestOperationHandlerTest.java | 2 +-
.../org/apache/nifi/c2/protocol/api/FlowInfo.java | 11 ++
.../nifi/c2/protocol/api/ProcessorBulletin.java | 148 +++++++++++++++++++++
.../nifi/minifi/commons/api/MiNiFiProperties.java | 1 +
.../apache/nifi/minifi/c2/C2NifiClientService.java | 54 +++++++-
.../src/main/resources/conf/bootstrap.conf | 3 +-
10 files changed, 250 insertions(+), 13 deletions(-)
diff --git
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index d9cd3aa90f..070cc02d16 100644
---
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -58,6 +58,7 @@ public class C2ClientConfig {
private final String c2RequestCompression;
private final String c2AssetDirectory;
private final long bootstrapAcknowledgeTimeout;
+ private final int c2FlowInfoProcessorBulletinLimit;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@@ -88,6 +89,7 @@ public class C2ClientConfig {
this.c2RequestCompression = builder.c2RequestCompression;
this.c2AssetDirectory = builder.c2AssetDirectory;
this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
+ this.c2FlowInfoProcessorBulletinLimit =
builder.c2FlowInfoProcessorBulletinLimit;
}
public String getC2Url() {
@@ -202,6 +204,10 @@ public class C2ClientConfig {
return bootstrapAcknowledgeTimeout;
}
+ public int getC2FlowInfoProcessorBulletinLimit() {
+ return c2FlowInfoProcessorBulletinLimit;
+ }
+
/**
* Builder for client configuration.
*/
@@ -238,6 +244,7 @@ public class C2ClientConfig {
private String c2RequestCompression;
private String c2AssetDirectory;
private long bootstrapAcknowledgeTimeout;
+ private int c2FlowInfoProcessorBulletinLimit;
public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
@@ -389,6 +396,11 @@ public class C2ClientConfig {
return this;
}
+ public Builder c2FlowInfoProcessorBulletinLimit(int
c2FlowInfoProcessorBulletinLimit) {
+ this.c2FlowInfoProcessorBulletinLimit =
c2FlowInfoProcessorBulletinLimit;
+ return this;
+ }
+
public C2ClientConfig build() {
return new C2ClientConfig(this);
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
index 06549b5663..653d9ba48f 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -35,6 +35,7 @@ import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -47,6 +48,7 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
@@ -89,7 +91,7 @@ public class C2HeartbeatFactory {
heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(),
runtimeInfoWrapper.getManifest()));
heartbeat.setDeviceInfo(generateDeviceInfo());
-
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus()));
+ heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(),
runtimeInfoWrapper.getProcessorBulletins()));
heartbeat.setCreated(System.currentTimeMillis());
ResourceInfo resourceInfo = new ResourceInfo();
@@ -99,9 +101,10 @@ public class C2HeartbeatFactory {
return heartbeat;
}
- private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus) {
+ private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus,
List<ProcessorBulletin> processorBulletins) {
FlowInfo flowInfo = new FlowInfo();
flowInfo.setQueues(queueStatus);
+ flowInfo.setProcessorBulletins(processorBulletins);
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
return flowInfo;
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
index c017ac0d11..b7653ea424 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/model/RuntimeInfoWrapper.java
@@ -16,20 +16,24 @@
*/
package org.apache.nifi.c2.client.service.model;
+import java.util.List;
import java.util.Map;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
public class RuntimeInfoWrapper {
final AgentRepositories repos;
final RuntimeManifest manifest;
final Map<String, FlowQueueStatus> queueStatus;
+ final List<ProcessorBulletin> processorBulletins;
- public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest
manifest, Map<String, FlowQueueStatus> queueStatus) {
+ public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest
manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin>
processorBulletins) {
this.repos = repos;
this.manifest = manifest;
this.queueStatus = queueStatus;
+ this.processorBulletins = processorBulletins;
}
public AgentRepositories getAgentRepositories() {
@@ -43,4 +47,8 @@ public class RuntimeInfoWrapper {
public Map<String, FlowQueueStatus> getQueueStatus() {
return queueStatus;
}
+
+ public List<ProcessorBulletin> getProcessorBulletins() {
+ return processorBulletins;
+ }
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
index 4fb20aeb86..ceaa42d4fa 100644
---
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
@@ -24,9 +24,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@@ -37,6 +39,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.OperationType;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.component.api.Bundle;
@@ -117,12 +120,14 @@ public class C2HeartbeatFactoryTest {
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
+ List<ProcessorBulletin> processorBulletins = new ArrayList<>();
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
assertEquals(repos,
heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
+ assertEquals(processorBulletins,
heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@@ -134,12 +139,14 @@ public class C2HeartbeatFactoryTest {
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
+ List<ProcessorBulletin> processorBulletins = new ArrayList<>();
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins));
assertEquals(repos,
heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
+ assertEquals(processorBulletins,
heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@@ -156,7 +163,7 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(),
Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new
ArrayList<>()));
assertEquals(MANIFEST_HASH,
heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
@@ -172,7 +179,7 @@ public class C2HeartbeatFactoryTest {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(),
supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
- C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new
RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new
ArrayList<>()));
assertEquals(MANIFEST_HASH,
heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
index f4508e7fed..24bcc0700a 100644
---
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java
@@ -58,7 +58,7 @@ public class DescribeManifestOperationHandlerTest {
void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setIdentifier("manifestId");
- RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null,
manifest, null);
+ RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null,
manifest, null, null);
C2Heartbeat heartbeat = new C2Heartbeat();
AgentInfo agentInfo = new AgentInfo();
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
index b6c9cc93ec..f2f683c1a3 100644
---
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/FlowInfo.java
@@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
public class FlowInfo implements Serializable {
@@ -29,6 +30,7 @@ public class FlowInfo implements Serializable {
private FlowUri flowUri;
private Map<String, ComponentStatus> components;
private Map<String, FlowQueueStatus> queues;
+ private List<ProcessorBulletin> processorBulletins;
@Schema(description = "A unique identifier of the flow currently deployed
on the agent")
public String getFlowId() {
@@ -66,4 +68,13 @@ public class FlowInfo implements Serializable {
this.queues = queues;
}
+ @Schema(description = "Bulletins of each processors")
+ public List<ProcessorBulletin> getProcessorBulletins() {
+ return processorBulletins;
+ }
+
+ public void setProcessorBulletins(List<ProcessorBulletin>
processorBulletins) {
+ this.processorBulletins = processorBulletins;
+ }
+
}
diff --git
a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java
new file mode 100644
index 0000000000..a10ba225d4
--- /dev/null
+++
b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ProcessorBulletin.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.protocol.api;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.io.Serializable;
+import java.util.Date;
+
+public class ProcessorBulletin implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Date timestamp;
+ private long id;
+ private String nodeAddress;
+ private String level;
+ private String category;
+ private String message;
+ private String groupId;
+ private String groupName;
+ private String groupPath;
+ private String sourceId;
+ private String sourceName;
+ private String flowFileUuid;
+
+ @Schema(description = "When this bulletin was generated")
+ public Date getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Date timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Schema(description = "The id of the bulletin")
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ @Schema(description = "If clustered, the address of the node from which
the bulletin originated")
+ public String getNodeAddress() {
+ return nodeAddress;
+ }
+
+ public void setNodeAddress(String nodeAddress) {
+ this.nodeAddress = nodeAddress;
+ }
+
+ @Schema(description = "The level of the bulletin")
+ public String getLevel() {
+ return level;
+ }
+
+ public void setLevel(String level) {
+ this.level = level;
+ }
+
+ @Schema(description = "The category of this bulletin")
+ public String getCategory() {
+ return category;
+ }
+
+ public void setCategory(String category) {
+ this.category = category;
+ }
+
+ @Schema(description = "The bulletin message")
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Schema(description = "The group id of the source component")
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ @Schema(description = "The group name of the source component")
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ @Schema(description = "The group path of the source component")
+ public String getGroupPath() {
+ return groupPath;
+ }
+
+ public void setGroupPath(String groupPath) {
+ this.groupPath = groupPath;
+ }
+
+ @Schema(description = "The id of the source component")
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ public void setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ }
+
+ @Schema(description = "The name of the source component")
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ @Schema(description = "The id of the flow file")
+ public String getFlowFileUuid() {
+ return flowFileUuid;
+ }
+
+ public void setFlowFileUuid(String flowFileUuid) {
+ this.flowFileUuid = flowFileUuid;
+ }
+}
diff --git
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
index b346eed20b..c40d2be198 100644
---
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
+++
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
@@ -92,6 +92,7 @@ public enum MiNiFiProperties {
C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false,
false, VALID),
C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true,
VALID),
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15
sec", false, true, VALID),
+
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT("c2.flow.info.processor.bulletin.limit",
"1000", false, true, NON_NEGATIVE_INTEGER_VALIDATOR ),
NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null,
false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path",
null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds",
null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 677798976a..02366147d5 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -44,6 +44,7 @@ import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_PATH_H
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_READ_TIMEOUT;
import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_REST_URL_ACK;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_RUNTIME_TYPE;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KEYSTORE_LOCATION;
@@ -59,11 +60,14 @@ import static
org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.c2.client.C2ClientConfig;
@@ -90,6 +94,7 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.controller.FlowController;
@@ -113,6 +118,8 @@ import
org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.nar.ExtensionManagerHolder;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@@ -173,8 +180,9 @@ public class C2NifiClientService {
this.c2OperationManager = new C2OperationManager(
client, c2OperationHandlerProvider, heartbeatLock,
operationQueueDAO, c2OperationRestartHandler);
+ Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () ->
generateRuntimeInfo(clientConfig.getC2FlowInfoProcessorBulletinLimit());
this.c2HeartbeatManager = new C2HeartbeatManager(
- client, heartbeatFactory, heartbeatLock,
this::generateRuntimeInfo, c2OperationManager);
+ client, heartbeatFactory, heartbeatLock,
runtimeInfoWrapperSupplier, c2OperationManager);
}
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
@@ -206,6 +214,8 @@ public class C2NifiClientService {
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(),
C2_REST_PATH_HEARTBEAT.getDefaultValue()))
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(),
C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties,
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
+ .c2FlowInfoProcessorBulletinLimit(parseInt(properties
+
.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(),
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())))
.build();
}
@@ -231,10 +241,12 @@ public class C2NifiClientService {
UpdateConfigurationStrategy updateConfigurationStrategy = new
DefaultUpdateConfigurationStrategy(flowController, flowService,
new StandardFlowEnrichService(niFiProperties),
flowPropertyEncryptor,
StandardFlowSerDeService.defaultInstance(),
niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
+ Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () ->
generateRuntimeInfo(
+
parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(),
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())));
return new C2OperationHandlerProvider(List.of(
new UpdateConfigurationOperationHandler(client, flowIdHolder,
updateConfigurationStrategy, emptyOperandPropertiesProvider),
- new DescribeManifestOperationHandler(heartbeatFactory,
this::generateRuntimeInfo, emptyOperandPropertiesProvider),
+ new DescribeManifestOperationHandler(heartbeatFactory,
runtimeInfoWrapperSupplier, emptyOperandPropertiesProvider),
TransferDebugOperationHandler.create(client,
emptyOperandPropertiesProvider,
transferDebugCommandHelper.debugBundleFiles(),
transferDebugCommandHelper::excludeSensitiveText),
UpdateAssetOperationHandler.create(client,
emptyOperandPropertiesProvider,
@@ -271,10 +283,10 @@ public class C2NifiClientService {
}
}
- private synchronized RuntimeInfoWrapper generateRuntimeInfo() {
+ private synchronized RuntimeInfoWrapper generateRuntimeInfo(int
processorBulletinLimit) {
AgentManifest agentManifest = new
AgentManifest(runtimeManifestService.getManifest());
agentManifest.setSupportedOperations(supportedOperationsProvider.getSupportedOperations());
- return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest,
getQueueStatus());
+ return new RuntimeInfoWrapper(getAgentRepositories(), agentManifest,
getQueueStatus(), getBulletins(processorBulletinLimit));
}
private AgentRepositories getAgentRepositories() {
@@ -321,4 +333,38 @@ public class C2NifiClientService {
})
.collect(toMap(Pair::getKey, Pair::getValue));
}
+
+ private List<ProcessorBulletin> getBulletins(int processorBulletinLimit) {
+ if (processorBulletinLimit > 0) {
+ String groupId = flowController.getEventAccess()
+ .getGroupStatus(ROOT_GROUP_ID)
+ .getId();
+ BulletinQuery query = new BulletinQuery.Builder()
+ .sourceType(ComponentType.PROCESSOR)
+ .groupIdMatches(groupId)
+ .limit(processorBulletinLimit)
+ .build();
+
+ return flowController.getBulletinRepository()
+ .findBulletins(query)
+ .stream()
+ .map(bulletin -> {
+ ProcessorBulletin processorBulletin = new
ProcessorBulletin();
+ processorBulletin.setCategory(bulletin.getCategory());
+
processorBulletin.setFlowFileUuid(bulletin.getFlowFileUuid());
+ processorBulletin.setGroupId(bulletin.getGroupId());
+
processorBulletin.setGroupName(bulletin.getGroupName());
+
processorBulletin.setGroupPath(bulletin.getGroupPath());
+ processorBulletin.setId(bulletin.getId());
+ processorBulletin.setLevel(bulletin.getLevel());
+ processorBulletin.setMessage(bulletin.getMessage());
+
processorBulletin.setNodeAddress(bulletin.getNodeAddress());
+ processorBulletin.setSourceId(bulletin.getSourceId());
+
processorBulletin.setSourceName(bulletin.getSourceName());
+
processorBulletin.setTimestamp(bulletin.getTimestamp());
+ return processorBulletin;
+ }).toList();
+ }
+ return new ArrayList<>();
+ }
}
\ No newline at end of file
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index 61afd6f355..761bc7bfc0 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -113,7 +113,8 @@ nifi.minifi.sensitive.props.algorithm=
#c2.security.keystore.password=
#c2.security.keystore.type=JKS
#c2.request.compression=none
-
+# Number of processor bulletins exposed as a part of flow info.
+#c2.flow.info.processor.bulletin.limit=0
### MiNiFi Notifiers For Flow Updates
# If C2 is enabled these should be disabled