Repository: nifi Updated Branches: refs/heads/master c07850aec -> dc5e03236
NIFI-3791 - added back pressure data into S2SStatusReportingTask This closes #1745. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dc5e0323 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dc5e0323 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dc5e0323 Branch: refs/heads/master Commit: dc5e032368f09681bde680fc467512ea3f28dfa3 Parents: c07850a Author: Pierre Villard <[email protected]> Authored: Wed May 3 22:56:22 2017 +0200 Committer: Koji Kawamura <[email protected]> Committed: Thu May 25 18:20:11 2017 +0900 ---------------------------------------------------------------------- .../reporting/SiteToSiteStatusReportingTask.java | 8 ++++++-- .../TestSiteToSiteStatusReportingTask.java | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/dc5e0323/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index d5dace2..c3d3da8 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -64,7 +64,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") - .description("The value to use for the platform field in each provenance event.") + .description("The value to use for the platform field in each status record.") .required(true) .expressionLanguageSupported(true) .defaultValue("nifi") @@ -179,7 +179,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa toIndex = Math.min(fromIndex + batchSize, jsonArray.size()); jsonBatch = jsonArray.subList(fromIndex, toIndex); } catch (final IOException e) { - throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e); } } } @@ -343,6 +343,10 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "inputCount", status.getInputCount()); addField(builder, "outputBytes", status.getOutputBytes()); addField(builder, "outputCount", status.getOutputCount()); + addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold()); + addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold()); + addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount()) + || (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes()))); arrayBuilder.add(builder.build()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/dc5e0323/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 3c737d1..443981c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -131,6 +131,24 @@ public class TestSiteToSiteStatusReportingTask { } @Test + public void testConnectionStatus() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonString backpressure = jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled"); + assertEquals("true", backpressure.getString()); + } + + @Test public void testComponentNameFilter() throws IOException, InitializationException { final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
