Repository: nifi
Updated Branches:
  refs/heads/0.x f8e655548 -> 7f2f01096


NIFI-1755 (0.x branch) Fixed remote process group status counts by only 
considering connected remote ports


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f2f0109
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f2f0109
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f2f0109

Branch: refs/heads/0.x
Commit: 7f2f01096c12e8b4eb0c61ec1191ffc8236cf5a4
Parents: f8e6555
Author: Pierre Villard <[email protected]>
Authored: Thu May 12 21:53:14 2016 +0200
Committer: Oleg Zhurakousky <[email protected]>
Committed: Mon May 16 13:45:25 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  |  30 ++---
 .../remote/RemoteProcessGroupStatusTest.java    | 133 +++++++++++++++++++
 2 files changed, 148 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7f2f0109/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index e6c013b..a5f2ee0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2415,7 +2415,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // determine if this input port is connected
             final boolean isConnected = port.hasIncomingConnection();
 
-            // we only want to conside remote ports that we are connected to
+            // we only want to consider remote ports that we are connected to
             if (isConnected) {
                 if (port.isRunning()) {
                     activePortCount++;
@@ -2424,15 +2424,15 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 }
 
                 activeThreadCount += 
processScheduler.getActiveThreadCount(port);
-            }
 
-            final FlowFileEvent portEvent = 
statusReport.getReportEntry(port.getIdentifier());
-            if (portEvent != null) {
-                lineageMillis += portEvent.getAggregateLineageMillis();
-                flowFilesRemoved += portEvent.getFlowFilesRemoved();
-                flowFilesTransferred += portEvent.getFlowFilesOut();
-                sentCount += portEvent.getFlowFilesSent();
-                sentContentSize += portEvent.getBytesSent();
+                final FlowFileEvent portEvent = 
statusReport.getReportEntry(port.getIdentifier());
+                if (portEvent != null) {
+                    lineageMillis += portEvent.getAggregateLineageMillis();
+                    flowFilesRemoved += portEvent.getFlowFilesRemoved();
+                    flowFilesTransferred += portEvent.getFlowFilesOut();
+                    sentCount += portEvent.getFlowFilesSent();
+                    sentContentSize += portEvent.getBytesSent();
+                }
             }
         }
 
@@ -2440,7 +2440,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // determine if this output port is connected
             final boolean isConnected = !port.getConnections().isEmpty();
 
-            // we only want to conside remote ports that we are connected from
+            // we only want to consider remote ports that we are connected from
             if (isConnected) {
                 if (port.isRunning()) {
                     activePortCount++;
@@ -2449,12 +2449,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 }
 
                 activeThreadCount += 
processScheduler.getActiveThreadCount(port);
-            }
 
-            final FlowFileEvent portEvent = 
statusReport.getReportEntry(port.getIdentifier());
-            if (portEvent != null) {
-                receivedCount += portEvent.getFlowFilesReceived();
-                receivedContentSize += portEvent.getBytesReceived();
+                final FlowFileEvent portEvent = 
statusReport.getReportEntry(port.getIdentifier());
+                if (portEvent != null) {
+                    receivedCount += portEvent.getFlowFilesReceived();
+                    receivedContentSize += portEvent.getBytesReceived();
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f2f0109/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/RemoteProcessGroupStatusTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/RemoteProcessGroupStatusTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/RemoteProcessGroupStatusTest.java
new file mode 100644
index 0000000..56f7074
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/RemoteProcessGroupStatusTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Collections;
+
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.admin.service.UserService;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.repository.RepositoryStatusReport;
+import org.apache.nifi.controller.repository.RingBufferEventRepository;
+import org.apache.nifi.controller.repository.StandardFlowFileEvent;
+import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.provenance.MockProvenanceEventRepository;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RemoteProcessGroupStatusTest {
+
+    private final UserService userService = mock(UserService.class);
+    private final AuditService auditService = mock(AuditService.class);
+    private volatile FlowController controller;
+
+    @BeforeClass
+    public static void beforeClass() {
+        try {
+            URL url = 
ClassLoader.getSystemClassLoader().getResource("nifi.properties");
+            System.setProperty("nifi.properties.file.path", url.getFile());
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to discover 
nifi.properties at the root of the classpath", e);
+        }
+    }
+
+    @Before
+    public void before() {
+        NiFiProperties properties = NiFiProperties.getInstance();
+        
properties.setProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, 
MockProvenanceEventRepository.class.getName());
+        properties.setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, 
"src/test/resources/state-management.xml");
+        
properties.setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, 
"local-provider");
+        properties.setProperty(NiFiProperties.REMOTE_INPUT_HOST, "localhost");
+        properties.setProperty("nifi.remote.input.secure", "false");
+
+        RingBufferEventRepository repository = new 
RingBufferEventRepository(1);
+        this.controller = FlowController.createStandaloneInstance(repository, 
NiFiProperties.getInstance(),
+                this.userService, this.auditService, null);
+    }
+
+    @After
+    public void after() {
+        this.controller.shutdown(false);
+    }
+
+    /**
+     * This test show that statistics are computed only if ports are
+     * correctly connected with process groups.
+     * @throws Exception exception
+     */
+    @Test
+    public void testStatusCountersWhenPortsDisconnected() throws Exception {
+        ProcessGroup receiverGroup = controller.createProcessGroup("SITE");
+        setControllerRootGroup(this.controller, receiverGroup);
+
+        RemoteProcessGroup remoteProcessGroup = 
controller.createRemoteProcessGroup("SENDER_REMOTE","http://foo:1234/nifi";);
+        receiverGroup.addRemoteProcessGroup(remoteProcessGroup);
+
+        String inputPortId = "inputId";
+        StandardRemoteProcessGroupPortDescriptor inputPortDescriptor = new 
StandardRemoteProcessGroupPortDescriptor();
+        inputPortDescriptor.setId(inputPortId);
+        inputPortDescriptor.setName("inputPort");
+        
remoteProcessGroup.setInputPorts(Collections.<RemoteProcessGroupPortDescriptor> 
singleton(inputPortDescriptor));
+        RemoteGroupPort inputPort = 
remoteProcessGroup.getInputPort(inputPortId);
+        inputPort.setProcessGroup(receiverGroup);
+
+        String outputPortId = "outputId";
+        StandardRemoteProcessGroupPortDescriptor outputPortDescriptor = new 
StandardRemoteProcessGroupPortDescriptor();
+        outputPortDescriptor.setId(outputPortId);
+        outputPortDescriptor.setName("outputPort");
+        
remoteProcessGroup.setOutputPorts(Collections.<RemoteProcessGroupPortDescriptor>
 singleton(outputPortDescriptor));
+        RemoteGroupPort outputPort = 
remoteProcessGroup.getOutputPort(outputPortId);
+        outputPort.setProcessGroup(receiverGroup);
+
+        RepositoryStatusReport rp = new StandardRepositoryStatusReport();
+        StandardFlowFileEvent inputEvent = new 
StandardFlowFileEvent(inputPortId);
+        inputEvent.setBytesSent(5);
+        rp.addReportEntry(inputEvent);
+
+        StandardFlowFileEvent outputEvent = new 
StandardFlowFileEvent(outputPortId);
+        outputEvent.setFlowFilesReceived(5);
+        rp.addReportEntry(outputEvent);
+
+        ProcessGroupStatus status = controller.getGroupStatus(receiverGroup, 
rp);
+        assertEquals(0, status.getFlowFilesReceived());
+        assertEquals(0, status.getBytesSent());
+    }
+
+    private void setControllerRootGroup(FlowController controller, 
ProcessGroup processGroup) {
+        try {
+            Method m = FlowController.class.getDeclaredMethod("setRootGroup", 
ProcessGroup.class);
+            m.setAccessible(true);
+            m.invoke(controller, processGroup);
+            controller.initializeFlow();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to set root group", e);
+        }
+    }
+
+}
\ No newline at end of file

Reply via email to