Repository: nifi
Updated Branches:
  refs/heads/master c07850aec -> dc5e03236


NIFI-3791 - added back pressure data into S2SStatusReportingTask

This closes #1745.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dc5e0323
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dc5e0323
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dc5e0323

Branch: refs/heads/master
Commit: dc5e032368f09681bde680fc467512ea3f28dfa3
Parents: c07850a
Author: Pierre Villard <[email protected]>
Authored: Wed May 3 22:56:22 2017 +0200
Committer: Koji Kawamura <[email protected]>
Committed: Thu May 25 18:20:11 2017 +0900

----------------------------------------------------------------------
 .../reporting/SiteToSiteStatusReportingTask.java  |  8 ++++++--
 .../TestSiteToSiteStatusReportingTask.java        | 18 ++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dc5e0323/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index d5dace2..c3d3da8 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -64,7 +64,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
 
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
-        .description("The value to use for the platform field in each 
provenance event.")
+        .description("The value to use for the platform field in each status 
record.")
         .required(true)
         .expressionLanguageSupported(true)
         .defaultValue("nifi")
@@ -179,7 +179,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
                 toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
                 jsonBatch = jsonArray.subList(fromIndex, toIndex);
             } catch (final IOException e) {
-                throw new ProcessException("Failed to send Provenance Events 
to destination due to IOException:" + e.getMessage(), e);
+                throw new ProcessException("Failed to send Status Records to 
destination due to IOException:" + e.getMessage(), e);
             }
         }
     }
@@ -343,6 +343,10 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
             addField(builder, "inputCount", status.getInputCount());
             addField(builder, "outputBytes", status.getOutputBytes());
             addField(builder, "outputCount", status.getOutputCount());
+            addField(builder, "backPressureBytesThreshold", 
status.getBackPressureBytesThreshold());
+            addField(builder, "backPressureObjectThreshold", 
status.getBackPressureObjectThreshold());
+            addField(builder, "isBackPressureEnabled", 
Boolean.toString((status.getBackPressureObjectThreshold() > 0 && 
status.getBackPressureObjectThreshold() <= status.getQueuedCount())
+                    || (status.getBackPressureBytesThreshold() > 0 && 
status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));
 
             arrayBuilder.add(builder.build());
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc5e0323/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
index 3c737d1..443981c 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java
@@ -131,6 +131,24 @@ public class TestSiteToSiteStatusReportingTask {
     }
 
     @Test
+    public void testConnectionStatus() throws IOException, 
InitializationException {
+        final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", 
"Awesome", 1, 0);
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
+        
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, 
"Awesome.*");
+        
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, 
"(Connection)");
+
+        MockSiteToSiteStatusReportingTask task = initTask(properties, 
pgStatus);
+        task.onTrigger(context);
+
+        final String msg = new String(task.dataSent.get(0), 
StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new 
ByteArrayInputStream(msg.getBytes()));
+        JsonString backpressure = 
jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled");
+        assertEquals("true", backpressure.getString());
+    }
+
+    @Test
     public void testComponentNameFilter() throws IOException, 
InitializationException {
         final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", 
"Awesome", 1, 0);
 

Reply via email to