Repository: nifi
Updated Branches:
  refs/heads/master 5872eb3c4 -> 619f1ffe8


http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 5667094..efe2bd4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -16,55 +16,13 @@
  */
 package org.apache.nifi.controller.repository;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.notNull;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@@ -99,6 +57,49 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class TestStandardProcessSession {
 
     private StandardProcessSession session;
@@ -207,7 +208,7 @@ public class TestStandardProcessSession {
         final FlowFileSwapManager swapManager = 
Mockito.mock(FlowFileSwapManager.class);
         final ProcessScheduler processScheduler = 
Mockito.mock(ProcessScheduler.class);
 
-        final StandardFlowFileQueue actualQueue = new 
StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null,
+        final StandardFlowFileQueue actualQueue = new 
StandardFlowFileQueue("1", new NopConnectionEventListener(), flowFileRepo, 
provenanceRepo, null,
                 processScheduler, swapManager, null, 10000, 0L, "0 B");
         return Mockito.spy(actualQueue);
     }
@@ -1515,7 +1516,7 @@ public class TestStandardProcessSession {
 
         final FlowFile originalFlowFile = session.get();
         assertTrue(flowFileQueue.isActiveQueueEmpty());
-        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertTrue(flowFileQueue.isUnacknowledgedFlowFile());
 
         final FlowFile modified = session.write(originalFlowFile, new 
OutputStreamCallback() {
             @Override
@@ -1538,7 +1539,7 @@ public class TestStandardProcessSession {
 
         assertFalse(flowFileQueue.isActiveQueueEmpty());
         assertEquals(1, flowFileQueue.size().getObjectCount());
-        assertEquals(0, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
     }
 
     @Test
@@ -1552,7 +1553,7 @@ public class TestStandardProcessSession {
 
         final FlowFile originalFlowFile = session.get();
         assertTrue(flowFileQueue.isActiveQueueEmpty());
-        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertTrue(flowFileQueue.isUnacknowledgedFlowFile());
 
         final FlowFile modified = session.write(originalFlowFile, new 
OutputStreamCallback() {
             @Override
@@ -1569,7 +1570,7 @@ public class TestStandardProcessSession {
         session.rollback();
         assertTrue(flowFileQueue.isActiveQueueEmpty());
         assertEquals(0, flowFileQueue.size().getObjectCount());
-        assertEquals(0, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
 
         session.rollback();
 
@@ -1578,7 +1579,7 @@ public class TestStandardProcessSession {
 
         final FlowFile originalRound2 = session.get();
         assertTrue(flowFileQueue.isActiveQueueEmpty());
-        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertTrue(flowFileQueue.isUnacknowledgedFlowFile());
 
         final FlowFile modifiedRound2 = session.write(originalRound2, new 
OutputStreamCallback() {
             @Override
@@ -1591,13 +1592,13 @@ public class TestStandardProcessSession {
 
         session.checkpoint();
         assertTrue(flowFileQueue.isActiveQueueEmpty());
-        assertEquals(1, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertTrue(flowFileQueue.isUnacknowledgedFlowFile());
 
         session.commit();
 
         // FlowFile transferred back to queue
         assertEquals(1, flowFileQueue.size().getObjectCount());
-        assertEquals(0, 
flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
+        assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
         assertFalse(flowFileQueue.isActiveQueueEmpty());
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 671c58d..402ce06 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -16,35 +16,20 @@
  */
 package org.apache.nifi.controller.repository;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueSize;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.QueueDiagnostics;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.StandardLocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@@ -69,6 +54,28 @@ import org.mockito.stubbing.Answer;
 import org.wali.MinimalLockingWriteAheadLog;
 import org.wali.WriteAheadRepository;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
 @SuppressWarnings("deprecation")
 public class TestWriteAheadFlowFileRepository {
 
@@ -92,6 +99,20 @@ public class TestWriteAheadFlowFileRepository {
     @Ignore("Intended only for local performance testing before/after making 
changes")
     public void testUpdatePerformance() throws IOException, 
InterruptedException {
         final FlowFileQueue queue = new FlowFileQueue() {
+            private LoadBalanceCompression compression = 
LoadBalanceCompression.DO_NOT_COMPRESS;
+
+            @Override
+            public void startLoadBalancing() {
+            }
+
+            @Override
+            public void stopLoadBalancing() {
+            }
+
+            @Override
+            public boolean isActivelyLoadBalancing() {
+                return false;
+            }
 
             @Override
             public String getIdentifier() {
@@ -113,11 +134,6 @@ public class TestWriteAheadFlowFileRepository {
             }
 
             @Override
-            public int getSwapFileCount() {
-                return 0;
-            }
-
-            @Override
             public void setPriorities(List<FlowFilePrioritizer> newPriorities) 
{
             }
 
@@ -155,21 +171,6 @@ public class TestWriteAheadFlowFileRepository {
             }
 
             @Override
-            public QueueSize getUnacknowledgedQueueSize() {
-                return null;
-            }
-
-            @Override
-            public QueueSize getActiveQueueSize() {
-                return size();
-            }
-
-            @Override
-            public QueueSize getSwapQueueSize() {
-                return null;
-            }
-
-            @Override
             public void acknowledge(FlowFileRecord flowFile) {
             }
 
@@ -178,12 +179,7 @@ public class TestWriteAheadFlowFileRepository {
             }
 
             @Override
-            public boolean isAllActiveFlowFilesPenalized() {
-                return false;
-            }
-
-            @Override
-            public boolean isAnyActiveFlowFilePenalized() {
+            public boolean isUnacknowledgedFlowFile() {
                 return false;
             }
 
@@ -211,11 +207,6 @@ public class TestWriteAheadFlowFileRepository {
             }
 
             @Override
-            public long drainQueue(Queue<FlowFileRecord> sourceQueue, 
List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> 
expiredRecords) {
-                return 0;
-            }
-
-            @Override
             public List<FlowFileRecord> poll(FlowFileFilter filter, 
Set<FlowFileRecord> expiredRecords) {
                 return null;
             }
@@ -272,6 +263,44 @@ public class TestWriteAheadFlowFileRepository {
             @Override
             public void verifyCanList() throws IllegalStateException {
             }
+
+            @Override
+            public QueueDiagnostics getQueueDiagnostics() {
+                final FlowFileQueueSize size = new 
FlowFileQueueSize(size().getObjectCount(), size().getByteCount(), 0, 0, 0, 0, 
0);
+                return new StandardQueueDiagnostics(new 
StandardLocalQueuePartitionDiagnostics(size, false, false), 
Collections.emptyList());
+            }
+
+            @Override
+            public void lock() {
+            }
+
+            @Override
+            public void unlock() {
+            }
+
+            @Override
+            public void setLoadBalanceStrategy(final LoadBalanceStrategy 
strategy, final String partitioningAttribute) {
+            }
+
+            @Override
+            public LoadBalanceStrategy getLoadBalanceStrategy() {
+                return null;
+            }
+
+            @Override
+            public void setLoadBalanceCompression(final LoadBalanceCompression 
compression) {
+                this.compression = compression;
+            }
+
+            @Override
+            public LoadBalanceCompression getLoadBalanceCompression() {
+                return compression;
+            }
+
+            @Override
+            public String getPartitioningAttribute() {
+                return null;
+            }
         };
 
 
@@ -370,7 +399,7 @@ public class TestWriteAheadFlowFileRepository {
         
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
 
         final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
-        final FlowFileQueue queue = new StandardFlowFileQueue("1234", 
connection, null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
+        final FlowFileQueue queue = new StandardFlowFileQueue("1234", new 
NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 
10000, 0L, "0 B");
 
         when(connection.getFlowFileQueue()).thenReturn(queue);
         queueProvider.addConnection(connection);
@@ -414,7 +443,7 @@ public class TestWriteAheadFlowFileRepository {
             records.add(rec2);
             repo.updateRepository(records);
 
-            final String swapLocation = 
swapMgr.swapOut(Collections.singletonList(flowFile2), queue);
+            final String swapLocation = 
swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null);
             repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, 
swapLocation);
         }
 
@@ -546,7 +575,7 @@ public class TestWriteAheadFlowFileRepository {
         }
 
         @Override
-        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue 
flowFileQueue) throws IOException {
+        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue 
flowFileQueue, final String partitionName) throws IOException {
             Map<String, List<FlowFileRecord>> swapMap = 
swappedRecords.get(flowFileQueue);
             if (swapMap == null) {
                 swapMap = new HashMap<>();
@@ -583,7 +612,7 @@ public class TestWriteAheadFlowFileRepository {
         }
 
         @Override
-        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) 
throws IOException {
+        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, 
final String partitionName) throws IOException {
             Map<String, List<FlowFileRecord>> swapMap = 
swappedRecords.get(flowFileQueue);
             if (swapMap == null) {
                 return null;
@@ -631,5 +660,14 @@ public class TestWriteAheadFlowFileRepository {
             this.swappedRecords.clear();
         }
 
+        @Override
+        public Set<String> getSwappedPartitionNames(FlowFileQueue queue) 
throws IOException {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public String changePartitionName(String swapLocation, String 
newPartitionName) throws IOException {
+            return swapLocation;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks
new file mode 100755
index 0000000..6db775d
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks
new file mode 100755
index 0000000..5a4fd2d
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
index 560363c..f6cb8c5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
@@ -20,15 +20,15 @@
             <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
         </encoder>
     </appender>
-    
+
     <appender name="FILE" class="ch.qos.logback.core.FileAppender">
         <file>./target/log</file>
         <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
             <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
         </encoder>
     </appender>
-    
-    
+
+
     <logger name="org.apache.nifi" level="INFO"/>
     <logger name="org.apache.nifi.controller.service" level="DEBUG"/>
     <logger name="org.apache.nifi.encrypt" level="DEBUG"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
index 7e87199..8908956 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.controller.repository.claim;
 
 public class StandardResourceClaim implements ResourceClaim, 
Comparable<ResourceClaim> {
-    private final StandardResourceClaimManager claimManager;
+    private final ResourceClaimManager claimManager;
     private final String id;
     private final String container;
     private final String section;
@@ -25,7 +25,7 @@ public class StandardResourceClaim implements ResourceClaim, 
Comparable<Resource
     private final int hashCode;
     private volatile boolean writable = true;
 
-    public StandardResourceClaim(final StandardResourceClaimManager 
claimManager, final String container, final String section, final String id, 
final boolean lossTolerant) {
+    public StandardResourceClaim(final ResourceClaimManager claimManager, 
final String container, final String section, final String id, final boolean 
lossTolerant) {
         this.claimManager = claimManager;
         this.container = container.intern();
         this.section = section.intern();

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index def8e89..c186e54 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -183,6 +183,13 @@
         <nifi.cluster.flow.election.max.wait.time>5 
mins</nifi.cluster.flow.election.max.wait.time>
         <nifi.cluster.flow.election.max.candidates />
 
+        <!-- nifi.properties: cluster load balance properties -->
+        <nifi.cluster.load.balance.host></nifi.cluster.load.balance.host>
+        <nifi.cluster.load.balance.port>7430</nifi.cluster.load.balance.port>
+        
<nifi.cluster.load.balance.connections.per.node>4</nifi.cluster.load.balance.connections.per.node>
+        
<nifi.cluster.load.balance.max.thread.count>8</nifi.cluster.load.balance.max.thread.count>
+        <nifi.cluster.load.balance.comms.timeout>30 
sec</nifi.cluster.load.balance.comms.timeout>
+
         <nifi.cluster.request.replication.claim.timeout>15 
secs</nifi.cluster.request.replication.claim.timeout>
 
         <!--  nifi.properties: zookeeper properties -->

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index c875bd9..a37e8b1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -216,6 +216,13 @@ nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
 
nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time}
 
nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates}
 
+# cluster load balancing properties #
+nifi.cluster.load.balance.host=${nifi.cluster.load.balance.host}
+nifi.cluster.load.balance.port=${nifi.cluster.load.balance.port}
+nifi.cluster.load.balance.connections.per.node=${nifi.cluster.load.balance.connections.per.node}
+nifi.cluster.load.balance.max.thread.count=${nifi.cluster.load.balance.max.thread.count}
+nifi.cluster.load.balance.comms.timeout=${nifi.cluster.load.balance.comms.timeout}
+
 # zookeeper properties, used for cluster management #
 nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
 nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6c2c3bc..06625f5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -88,7 +88,11 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileSummary;
 import org.apache.nifi.controller.queue.ListFlowFileState;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.QueueDiagnostics;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -165,6 +169,7 @@ import 
org.apache.nifi.web.api.dto.action.details.MoveDetailsDTO;
 import org.apache.nifi.web.api.dto.action.details.PurgeDetailsDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ClassLoaderDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
+import 
org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.diagnostics.GCDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.diagnostics.GarbageCollectionDiagnosticsDTO;
@@ -173,7 +178,9 @@ import 
org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.diagnostics.JVMFlowDiagnosticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.diagnostics.LocalQueuePartitionDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
+import org.apache.nifi.web.api.dto.diagnostics.RemoteQueuePartitionDTO;
 import org.apache.nifi.web.api.dto.diagnostics.RepositoryUsageDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO;
 import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO;
@@ -669,11 +676,13 @@ public final class DtoFactory {
         dto.setDestination(createConnectableDto(connection.getDestination()));
         
dto.setVersionedComponentId(connection.getVersionedComponentId().orElse(null));
 
-        
dto.setBackPressureObjectThreshold(connection.getFlowFileQueue().getBackPressureObjectThreshold());
-        
dto.setBackPressureDataSizeThreshold(connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
-        
dto.setFlowFileExpiration(connection.getFlowFileQueue().getFlowFileExpiration());
+        final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
+
+        
dto.setBackPressureObjectThreshold(flowFileQueue.getBackPressureObjectThreshold());
+        
dto.setBackPressureDataSizeThreshold(flowFileQueue.getBackPressureDataSizeThreshold());
+        dto.setFlowFileExpiration(flowFileQueue.getFlowFileExpiration());
         dto.setPrioritizers(new ArrayList<String>());
-        for (final FlowFilePrioritizer comparator : 
connection.getFlowFileQueue().getPriorities()) {
+        for (final FlowFilePrioritizer comparator : 
flowFileQueue.getPriorities()) {
             
dto.getPrioritizers().add(comparator.getClass().getCanonicalName());
         }
 
@@ -699,6 +708,19 @@ public final class DtoFactory {
             }
         }
 
+        final LoadBalanceStrategy loadBalanceStrategy = 
flowFileQueue.getLoadBalanceStrategy();
+        
dto.setLoadBalancePartitionAttribute(flowFileQueue.getPartitioningAttribute());
+        dto.setLoadBalanceStrategy(loadBalanceStrategy.name());
+        
dto.setLoadBalanceCompression(flowFileQueue.getLoadBalanceCompression().name());
+
+        if (loadBalanceStrategy == LoadBalanceStrategy.DO_NOT_LOAD_BALANCE) {
+            
dto.setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_NOT_CONFIGURED);
+        } else if (flowFileQueue.isActivelyLoadBalancing()) {
+            dto.setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_ACTIVE);
+        } else {
+            dto.setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_INACTIVE);
+        }
+
         return dto;
     }
 
@@ -3335,30 +3357,84 @@ public final class DtoFactory {
         return dto;
     }
 
+
     private ConnectionDiagnosticsDTO createConnectionDiagnosticsDto(final 
Connection connection) {
         final ConnectionDiagnosticsDTO dto = new ConnectionDiagnosticsDTO();
         dto.setConnection(createConnectionDto(connection));
+        
dto.setAggregateSnapshot(createConnectionDiagnosticsSnapshotDto(connection));
+        return dto;
+    }
+
+    private ConnectionDiagnosticsSnapshotDTO 
createConnectionDiagnosticsSnapshotDto(final Connection connection) {
+        final ConnectionDiagnosticsSnapshotDTO dto = new 
ConnectionDiagnosticsSnapshotDTO();
+
+        final QueueDiagnostics queueDiagnostics = 
connection.getFlowFileQueue().getQueueDiagnostics();
 
         final FlowFileQueue queue = connection.getFlowFileQueue();
         final QueueSize totalSize = queue.size();
         dto.setTotalByteCount(totalSize.getByteCount());
         dto.setTotalFlowFileCount(totalSize.getObjectCount());
 
-        final QueueSize activeSize = queue.getActiveQueueSize();
+        final LocalQueuePartitionDiagnostics localDiagnostics = 
queueDiagnostics.getLocalQueuePartitionDiagnostics();
+        
dto.setLocalQueuePartition(createLocalQueuePartitionDto(localDiagnostics));
+
+        final List<RemoteQueuePartitionDiagnostics> remoteDiagnostics = 
queueDiagnostics.getRemoteQueuePartitionDiagnostics();
+        if (remoteDiagnostics != null) {
+            final List<RemoteQueuePartitionDTO> remoteDiagnosticsDtos = 
remoteDiagnostics.stream()
+                .map(this::createRemoteQueuePartitionDto)
+                .collect(Collectors.toList());
+
+            dto.setRemoteQueuePartitions(remoteDiagnosticsDtos);
+        }
+
+        return dto;
+    }
+
+    private LocalQueuePartitionDTO createLocalQueuePartitionDto(final 
LocalQueuePartitionDiagnostics queueDiagnostics) {
+        final LocalQueuePartitionDTO dto = new LocalQueuePartitionDTO();
+
+        final QueueSize activeSize = queueDiagnostics.getActiveQueueSize();
+        dto.setActiveQueueByteCount(activeSize.getByteCount());
+        dto.setActiveQueueFlowFileCount(activeSize.getObjectCount());
+
+        final QueueSize inFlightSize = 
queueDiagnostics.getUnacknowledgedQueueSize();
+        dto.setInFlightByteCount(inFlightSize.getByteCount());
+        dto.setInFlightFlowFileCount(inFlightSize.getObjectCount());
+
+        final QueueSize swapSize = queueDiagnostics.getSwapQueueSize();
+        dto.setSwapByteCount(swapSize.getByteCount());
+        dto.setSwapFlowFileCount(swapSize.getObjectCount());
+        dto.setSwapFiles(queueDiagnostics.getSwapFileCount());
+
+        dto.setTotalByteCount(activeSize.getByteCount() + 
inFlightSize.getByteCount() + swapSize.getByteCount());
+        dto.setTotalFlowFileCount(activeSize.getObjectCount() + 
inFlightSize.getObjectCount() + swapSize.getObjectCount());
+
+        
dto.setAllActiveQueueFlowFilesPenalized(queueDiagnostics.isAllActiveFlowFilesPenalized());
+        
dto.setAnyActiveQueueFlowFilesPenalized(queueDiagnostics.isAnyActiveFlowFilePenalized());
+
+        return dto;
+    }
+
+    private RemoteQueuePartitionDTO createRemoteQueuePartitionDto(final 
RemoteQueuePartitionDiagnostics queueDiagnostics) {
+        final RemoteQueuePartitionDTO dto = new RemoteQueuePartitionDTO();
+
+        dto.setNodeIdentifier(queueDiagnostics.getNodeIdentifier());
+
+        final QueueSize activeSize = queueDiagnostics.getActiveQueueSize();
         dto.setActiveQueueByteCount(activeSize.getByteCount());
         dto.setActiveQueueFlowFileCount(activeSize.getObjectCount());
 
-        final QueueSize inFlightSize = queue.getUnacknowledgedQueueSize();
+        final QueueSize inFlightSize = 
queueDiagnostics.getUnacknowledgedQueueSize();
         dto.setInFlightByteCount(inFlightSize.getByteCount());
         dto.setInFlightFlowFileCount(inFlightSize.getObjectCount());
 
-        final QueueSize swapSize = queue.getSwapQueueSize();
+        final QueueSize swapSize = queueDiagnostics.getSwapQueueSize();
         dto.setSwapByteCount(swapSize.getByteCount());
         dto.setSwapFlowFileCount(swapSize.getObjectCount());
+        dto.setSwapFiles(queueDiagnostics.getSwapFileCount());
 
-        dto.setSwapFiles(queue.getSwapFileCount());
-        
dto.setAllActiveQueueFlowFilesPenalized(queue.isAllActiveFlowFilesPenalized());
-        
dto.setAnyActiveQueueFlowFilesPenalized(queue.isAnyActiveFlowFilePenalized());
+        dto.setTotalByteCount(activeSize.getByteCount() + 
inFlightSize.getByteCount() + swapSize.getByteCount());
+        dto.setTotalFlowFileCount(activeSize.getObjectCount() + 
inFlightSize.getObjectCount() + swapSize.getObjectCount());
 
         return dto;
     }
@@ -3835,6 +3911,10 @@ public final class DtoFactory {
         copy.setzIndex(original.getzIndex());
         copy.setLabelIndex(original.getLabelIndex());
         copy.setBends(copy(original.getBends()));
+        
copy.setLoadBalancePartitionAttribute(original.getLoadBalancePartitionAttribute());
+        copy.setLoadBalanceStrategy(original.getLoadBalanceStrategy());
+        copy.setLoadBalanceCompression(original.getLoadBalanceCompression());
+        copy.setLoadBalanceStatus(original.getLoadBalanceStatus());
         copy.setVersionedComponentId(original.getVersionedComponentId());
 
         return copy;

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index bb4d2fe..062f151 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -25,6 +25,8 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.exception.ValidationException;
@@ -178,6 +180,18 @@ public class StandardConnectionDAO extends ComponentDAO 
implements ConnectionDAO
             connection.getFlowFileQueue().setPriorities(newPrioritizers);
         }
 
+        final String loadBalanceStrategyName = 
connectionDTO.getLoadBalanceStrategy();
+        final String loadBalancePartitionAttribute = 
connectionDTO.getLoadBalancePartitionAttribute();
+        if (isNotNull(loadBalanceStrategyName)) {
+            final LoadBalanceStrategy loadBalanceStrategy = 
LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
+            
connection.getFlowFileQueue().setLoadBalanceStrategy(loadBalanceStrategy, 
loadBalancePartitionAttribute);
+        }
+
+        final String loadBalanceCompressionName = 
connectionDTO.getLoadBalanceCompression();
+        if (isNotNull(loadBalanceCompressionName)) {
+            
connection.getFlowFileQueue().setLoadBalanceCompression(LoadBalanceCompression.valueOf(loadBalanceCompressionName));
+        }
+
         // update the connection state
         if (isNotNull(connectionDTO.getBends())) {
             final List<Position> bendPoints = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/pom.xml
index 60fa7ba..0a81441 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml
@@ -23,10 +23,10 @@
     <packaging>pom</packaging>
     <description>NiFi: Framework Bundle</description>
     <properties>
-        <nifi.registry.version>0.2.0</nifi.registry.version>
+        <nifi.registry.version>0.3.0</nifi.registry.version>
         <jersey.version>2.26</jersey.version>
         <spring.version>4.3.10.RELEASE</spring.version>
-        <spring.security.version>4.2.4.RELEASE</spring.security.version>    
+        <spring.security.version>4.2.4.RELEASE</spring.security.version>
         <jackson.version>2.9.7</jackson.version>
     </properties>
     <modules>
@@ -265,8 +265,8 @@
                 <artifactId>quartz</artifactId>
                 <version>2.2.1</version>
                 <exclusions>
-                    <!-- | Exclude the quartz 2.2.1 bundled version of c3p0 
-                    because it is lgpl licensed | We also don't use the JDBC 
related features 
+                    <!-- | Exclude the quartz 2.2.1 bundled version of c3p0
+                    because it is lgpl licensed | We also don't use the JDBC 
related features
                     of quartz for which the dependency would matter -->
                     <exclusion>
                         <groupId>c3p0</groupId>
@@ -288,7 +288,7 @@
                 <groupId>javax.mail</groupId>
                 <artifactId>mail</artifactId>
                 <version>1.4.7</version>
-            </dependency>         
+            </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents</groupId>
                 <artifactId>httpclient</artifactId>
@@ -521,7 +521,7 @@
                 <artifactId>spring-core</artifactId>
                 <version>${spring.version}</version>
                 <exclusions>
-                    <!-- <artifactId>jcl-over-slf4j</artifactId> is used 
+                    <!-- <artifactId>jcl-over-slf4j</artifactId> is used
                     in dependencies section -->
                     <exclusion>
                         <groupId>commons-logging</groupId>

Reply via email to