This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 58bcd6c NIFI-7106 - Add parent name and parent path in
SiteToSiteStatusReportingTask
58bcd6c is described below
commit 58bcd6c5ddc1989e99b5630b89f413ef4726b4a0
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Feb 4 22:35:11 2020 -0500
NIFI-7106 - Add parent name and parent path in SiteToSiteStatusReportingTask
Signed-off-by: Matthew Burgess <[email protected]>
This closes #4039
---
.../reporting/SiteToSiteStatusReportingTask.java | 62 +++++++++++++---------
.../additionalDetails.html | 2 +
.../src/main/resources/schema-status.avsc | 2 +
.../TestSiteToSiteStatusReportingTask.java | 5 ++
4 files changed, 46 insertions(+), 25 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 2466827..31009f8 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -94,6 +94,7 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
private volatile Pattern componentTypeFilter;
private volatile Pattern componentNameFilter;
+ private volatile Map<String,String> processGroupIDToPath;
public SiteToSiteStatusReportingTask() throws IOException {
final InputStream schema =
getClass().getClassLoader().getResourceAsStream("schema-status.avsc");
@@ -122,6 +123,9 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
componentTypeFilter =
Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).evaluateAttributeExpressions().getValue());
componentNameFilter =
Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).evaluateAttributeExpressions().getValue());
+ // initialize the map
+ processGroupIDToPath = new HashMap<String,String>();
+
final ProcessGroupStatus procGroupStatus =
context.getEventAccess().getControllerStatus();
final String rootGroupName = procGroupStatus == null ? null :
procGroupStatus.getName();
@@ -145,8 +149,8 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
df.setTimeZone(TimeZone.getTimeZone("Z"));
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
- serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus,
df, hostname, rootGroupName,
- platform, null, new Date(), allowNullValues);
+ serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df,
+ hostname, rootGroupName, platform, null, new Date(),
allowNullValues);
final JsonArray jsonArray = arrayBuilder.build();
@@ -230,22 +234,26 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
* The root process group name
* @param platform
* The configured platform
- * @param parentId
- * The parent's component id
+ * @param parent
+ * The parent's process group status object
* @param currentDate
* The current date
* @param allowNullValues
* Allow null values
*/
private void serializeProcessGroupStatus(final JsonArrayBuilder
arrayBuilder, final JsonBuilderFactory factory,
- final ProcessGroupStatus status, final DateFormat df,
- final String hostname, final String applicationName, final String
platform, final String parentId, final Date currentDate, Boolean
allowNullValues) {
+ final ProcessGroupStatus status, final DateFormat df, final String
hostname, final String applicationName,
+ final String platform, final ProcessGroupStatus parent, final Date
currentDate, Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
- final String componentType = (parentId == null) ? "RootProcessGroup" :
"ProcessGroup";
+ final String componentType = parent == null ? "RootProcessGroup" :
"ProcessGroup";
final String componentName = status.getName();
+ if(parent == null) {
+ processGroupIDToPath.put(status.getId(), "NiFi Flow");
+ }
+
if (componentMatchesFilters(componentType, componentName)) {
- addCommonFields(builder, df, hostname, applicationName, platform,
parentId, currentDate,
+ addCommonFields(builder, df, hostname, applicationName, platform,
parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@@ -271,40 +279,43 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
}
for(ProcessGroupStatus childGroupStatus :
status.getProcessGroupStatus()) {
+
+ processGroupIDToPath.put(childGroupStatus.getId(),
processGroupIDToPath.get(status.getId()) + " / " + childGroupStatus.getName());
+
serializeProcessGroupStatus(arrayBuilder, factory,
childGroupStatus, df, hostname,
- applicationName, platform, status.getId(), currentDate,
allowNullValues);
+ applicationName, platform, status, currentDate,
allowNullValues);
}
for(ProcessorStatus processorStatus : status.getProcessorStatus()) {
serializeProcessorStatus(arrayBuilder, factory, processorStatus,
df, hostname,
- applicationName, platform, status.getId(), currentDate,
allowNullValues);
+ applicationName, platform, status, currentDate,
allowNullValues);
}
for(ConnectionStatus connectionStatus : status.getConnectionStatus()) {
serializeConnectionStatus(arrayBuilder, factory, connectionStatus,
df, hostname,
- applicationName, platform, status.getId(), currentDate,
allowNullValues);
+ applicationName, platform, status, currentDate,
allowNullValues);
}
for(PortStatus portStatus : status.getInputPortStatus()) {
serializePortStatus("InputPort", arrayBuilder, factory,
portStatus, df,
- hostname, applicationName, platform, status.getId(),
currentDate, allowNullValues);
+ hostname, applicationName, platform, status, currentDate,
allowNullValues);
}
for(PortStatus portStatus : status.getOutputPortStatus()) {
serializePortStatus("OutputPort", arrayBuilder, factory,
portStatus, df,
- hostname, applicationName, platform, status.getId(),
currentDate, allowNullValues);
+ hostname, applicationName, platform, status, currentDate,
allowNullValues);
}
for(RemoteProcessGroupStatus remoteProcessGroupStatus :
status.getRemoteProcessGroupStatus()) {
serializeRemoteProcessGroupStatus(arrayBuilder, factory,
remoteProcessGroupStatus, df, hostname,
- applicationName, platform, status.getId(), currentDate,
allowNullValues);
+ applicationName, platform, status, currentDate,
allowNullValues);
}
}
private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder
arrayBuilder, final JsonBuilderFactory factory,
final RemoteProcessGroupStatus status, final DateFormat df, final
String hostname, final String applicationName,
- final String platform, final String parentId, final Date
currentDate, final Boolean allowNullValues) {
+ final String platform, final ProcessGroupStatus parent, final Date
currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "RemoteProcessGroup";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
- addCommonFields(builder, df, hostname, applicationName, platform,
parentId, currentDate,
+ addCommonFields(builder, df, hostname, applicationName, platform,
parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@@ -324,12 +335,12 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
}
private void serializePortStatus(final String componentType, final
JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final
PortStatus status,
- final DateFormat df, final String hostname, final String
applicationName, final String platform, final String parentId, final Date
currentDate, final Boolean allowNullValues) {
+ final DateFormat df, final String hostname, final String
applicationName, final String platform, final ProcessGroupStatus parent, final
Date currentDate, final Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
- addCommonFields(builder, df, hostname, applicationName, platform,
parentId, currentDate,
+ addCommonFields(builder, df, hostname, applicationName, platform,
parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@@ -350,13 +361,13 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
}
private void serializeConnectionStatus(final JsonArrayBuilder
arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status,
final DateFormat df,
- final String hostname, final String applicationName, final String
platform, final String parentId, final Date currentDate, final Boolean
allowNullValues) {
+ final String hostname, final String applicationName, final String
platform, final ProcessGroupStatus parent, final Date currentDate, final
Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Connection";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
- addCommonFields(builder, df, hostname, applicationName, platform,
parentId, currentDate,
+ addCommonFields(builder, df, hostname, applicationName, platform,
parent, currentDate,
componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
@@ -383,13 +394,13 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
}
private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder,
final JsonBuilderFactory factory, final ProcessorStatus status, final
DateFormat df,
- final String hostname, final String applicationName, final String
platform, final String parentId, final Date currentDate, final Boolean
allowNullValues) {
+ final String hostname, final String applicationName, final String
platform, final ProcessGroupStatus parent, final Date currentDate, final
Boolean allowNullValues) {
final JsonObjectBuilder builder = factory.createObjectBuilder();
final String componentType = "Processor";
final String componentName = status.getName();
if (componentMatchesFilters(componentType, componentName)) {
- addCommonFields(builder, df, hostname, applicationName, platform,
parentId, currentDate, componentType, componentName, allowNullValues);
+ addCommonFields(builder, df, hostname, applicationName, platform,
parent, currentDate, componentType, componentName, allowNullValues);
addField(builder, "componentId", status.getId(), allowNullValues);
addField(builder, "processorType", status.getType(),
allowNullValues);
@@ -418,7 +429,7 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
}
private void addCommonFields(final JsonObjectBuilder builder, final
DateFormat df, final String hostname,
- final String applicationName, final String platform, final String
parentId, final Date currentDate,
+ final String applicationName, final String platform, final
ProcessGroupStatus parent, final Date currentDate,
final String componentType, final String componentName, Boolean
allowNullValues) {
addField(builder, "statusId", UUID.randomUUID().toString(),
allowNullValues);
addField(builder, "timestampMillis", currentDate.getTime(),
allowNullValues);
@@ -426,12 +437,13 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
addField(builder, "actorHostname", hostname, allowNullValues);
addField(builder, "componentType", componentType, allowNullValues);
addField(builder, "componentName", componentName, allowNullValues);
- addField(builder, "parentId", parentId, allowNullValues);
+ addField(builder, "parentId", parent == null ? null : parent.getId(),
allowNullValues);
+ addField(builder, "parentName", parent == null ? null :
parent.getName(), allowNullValues);
+ addField(builder, "parentPath", parent == null ? null :
processGroupIDToPath.get(parent.getId()), allowNullValues);
addField(builder, "platform", platform, allowNullValues);
addField(builder, "application", applicationName, allowNullValues);
}
-
private static void addField(final JsonObjectBuilder builder, final
JsonBuilderFactory factory, final String key, final Map<String, Long> values,
final Boolean allowNullValues) {
if (values != null) {
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
index dad5656..f6dca55 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html
@@ -51,6 +51,8 @@
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
+ { "name" : "parentName", "type" : ["string", "null"]},
+ { "name" : "parentPath", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
index 1932c7e..70e4af1 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc
@@ -12,6 +12,8 @@
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
+ { "name" : "parentName", "type" : ["string", "null"]},
+ { "name" : "parentPath", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
index 9914bf1..3dfab49 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
@@ -333,6 +334,10 @@ public class TestSiteToSiteStatusReportingTask {
final String msg = new String(task.dataSent.get(0),
StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new
ByteArrayInputStream(msg.getBytes()));
JsonObject object = jsonReader.readArray().getJsonObject(0);
+ JsonString parentName = object.getJsonString("parentName");
+ assertTrue(parentName.getString().startsWith("Awesome.1-"));
+ JsonString parentPath = object.getJsonString("parentPath");
+ assertTrue(parentPath.getString().startsWith("NiFi Flow / Awesome.1"));
JsonString runStatus = object.getJsonString("runStatus");
assertEquals(RunStatus.Running.name(), runStatus.getString());
JsonNumber inputBytes = object.getJsonNumber("inputBytes");