Repository: nifi Updated Branches: refs/heads/master 5872eb3c4 -> 619f1ffe8
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 5667094..efe2bd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -16,55 +16,13 @@ */ package org.apache.nifi.controller.repository; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.notNull; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; - import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.NopConnectionEventListener; +import org.apache.nifi.controller.queue.StandardFlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; @@ -99,6 +57,49 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestStandardProcessSession { private StandardProcessSession session; @@ -207,7 +208,7 @@ public class TestStandardProcessSession { final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); - final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, + final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", new NopConnectionEventListener(), flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, 0L, "0 B"); return Mockito.spy(actualQueue); } @@ -1515,7 +1516,7 @@ public class TestStandardProcessSession { final FlowFile originalFlowFile = session.get(); assertTrue(flowFileQueue.isActiveQueueEmpty()); - assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertTrue(flowFileQueue.isUnacknowledgedFlowFile()); final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() { @Override @@ -1538,7 +1539,7 @@ public class TestStandardProcessSession { assertFalse(flowFileQueue.isActiveQueueEmpty()); assertEquals(1, flowFileQueue.size().getObjectCount()); - assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertFalse(flowFileQueue.isUnacknowledgedFlowFile()); } @Test @@ -1552,7 +1553,7 @@ public class TestStandardProcessSession { final FlowFile originalFlowFile = session.get(); assertTrue(flowFileQueue.isActiveQueueEmpty()); - assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertTrue(flowFileQueue.isUnacknowledgedFlowFile()); final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() { @Override @@ -1569,7 +1570,7 @@ public class TestStandardProcessSession { session.rollback(); assertTrue(flowFileQueue.isActiveQueueEmpty()); assertEquals(0, flowFileQueue.size().getObjectCount()); - assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertFalse(flowFileQueue.isUnacknowledgedFlowFile()); session.rollback(); @@ -1578,7 +1579,7 @@ public class TestStandardProcessSession { final FlowFile originalRound2 = session.get(); assertTrue(flowFileQueue.isActiveQueueEmpty()); - assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertTrue(flowFileQueue.isUnacknowledgedFlowFile()); final FlowFile modifiedRound2 = session.write(originalRound2, new OutputStreamCallback() { @Override @@ -1591,13 +1592,13 @@ public class TestStandardProcessSession { session.checkpoint(); assertTrue(flowFileQueue.isActiveQueueEmpty()); - assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertTrue(flowFileQueue.isUnacknowledgedFlowFile()); session.commit(); // FlowFile transferred back to queue assertEquals(1, flowFileQueue.size().getObjectCount()); - assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertFalse(flowFileQueue.isUnacknowledgedFlowFile()); assertFalse(flowFileQueue.isActiveQueueEmpty()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 671c58d..402ce06 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -16,35 +16,20 @@ */ package org.apache.nifi.controller.repository; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.FlowFileQueueSize; import org.apache.nifi.controller.queue.ListFlowFileStatus; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.queue.NopConnectionEventListener; +import org.apache.nifi.controller.queue.QueueDiagnostics; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.StandardFlowFileQueue; +import org.apache.nifi.controller.queue.StandardLocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.StandardQueueDiagnostics; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; @@ -69,6 +54,28 @@ import org.mockito.stubbing.Answer; import org.wali.MinimalLockingWriteAheadLog; import org.wali.WriteAheadRepository; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + @SuppressWarnings("deprecation") public class TestWriteAheadFlowFileRepository { @@ -92,6 +99,20 @@ public class TestWriteAheadFlowFileRepository { @Ignore("Intended only for local performance testing before/after making changes") public void testUpdatePerformance() throws IOException, InterruptedException { final FlowFileQueue queue = new FlowFileQueue() { + private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS; + + @Override + public void startLoadBalancing() { + } + + @Override + public void stopLoadBalancing() { + } + + @Override + public boolean isActivelyLoadBalancing() { + return false; + } @Override public String getIdentifier() { @@ -113,11 +134,6 @@ public class TestWriteAheadFlowFileRepository { } @Override - public int getSwapFileCount() { - return 0; - } - - @Override public void setPriorities(List<FlowFilePrioritizer> newPriorities) { } @@ -155,21 +171,6 @@ public class TestWriteAheadFlowFileRepository { } @Override - public QueueSize getUnacknowledgedQueueSize() { - return null; - } - - @Override - public QueueSize getActiveQueueSize() { - return size(); - } - - @Override - public QueueSize getSwapQueueSize() { - return null; - } - - @Override public void acknowledge(FlowFileRecord flowFile) { } @@ -178,12 +179,7 @@ public class TestWriteAheadFlowFileRepository { } @Override - public boolean isAllActiveFlowFilesPenalized() { - return false; - } - - @Override - public boolean isAnyActiveFlowFilePenalized() { + public boolean isUnacknowledgedFlowFile() { return false; } @@ -211,11 +207,6 @@ public class TestWriteAheadFlowFileRepository { } @Override - public long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords) { - return 0; - } - - @Override public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) { return null; } @@ -272,6 +263,44 @@ public class TestWriteAheadFlowFileRepository { @Override public void verifyCanList() throws IllegalStateException { } + + @Override + public QueueDiagnostics getQueueDiagnostics() { + final FlowFileQueueSize size = new FlowFileQueueSize(size().getObjectCount(), size().getByteCount(), 0, 0, 0, 0, 0); + return new StandardQueueDiagnostics(new StandardLocalQueuePartitionDiagnostics(size, false, false), Collections.emptyList()); + } + + @Override + public void lock() { + } + + @Override + public void unlock() { + } + + @Override + public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) { + } + + @Override + public LoadBalanceStrategy getLoadBalanceStrategy() { + return null; + } + + @Override + public void setLoadBalanceCompression(final LoadBalanceCompression compression) { + this.compression = compression; + } + + @Override + public LoadBalanceCompression getLoadBalanceCompression() { + return compression; + } + + @Override + public String getPartitioningAttribute() { + return null; + } }; @@ -370,7 +399,7 @@ public class TestWriteAheadFlowFileRepository { when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager(); - final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B"); + final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B"); when(connection.getFlowFileQueue()).thenReturn(queue); queueProvider.addConnection(connection); @@ -414,7 +443,7 @@ public class TestWriteAheadFlowFileRepository { records.add(rec2); repo.updateRepository(records); - final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue); + final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue, null); repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation); } @@ -546,7 +575,7 @@ public class TestWriteAheadFlowFileRepository { } @Override - public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException { + public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException { Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue); if (swapMap == null) { swapMap = new HashMap<>(); @@ -583,7 +612,7 @@ public class TestWriteAheadFlowFileRepository { } @Override - public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException { + public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, final String partitionName) throws IOException { Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue); if (swapMap == null) { return null; @@ -631,5 +660,14 @@ public class TestWriteAheadFlowFileRepository { this.swappedRecords.clear(); } + @Override + public Set<String> getSwappedPartitionNames(FlowFileQueue queue) throws IOException { + return Collections.emptySet(); + } + + @Override + public String changePartitionName(String swapLocation, String newPartitionName) throws IOException { + return swapLocation; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks new file mode 100755 index 0000000..6db775d Binary files /dev/null and b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ks.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks new file mode 100755 index 0000000..5a4fd2d Binary files /dev/null and b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/localhost-ts.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml index 560363c..f6cb8c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml @@ -20,15 +20,15 @@ <pattern>%-4r [%t] %-5p %c - %m%n</pattern> </encoder> </appender> - + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> <file>./target/log</file> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date %level [%thread] %logger{40} %msg%n</pattern> </encoder> </appender> - - + + <logger name="org.apache.nifi" level="INFO"/> <logger name="org.apache.nifi.controller.service" level="DEBUG"/> <logger name="org.apache.nifi.encrypt" level="DEBUG"/> http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java index 7e87199..8908956 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java @@ -17,7 +17,7 @@ package org.apache.nifi.controller.repository.claim; public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> { - private final StandardResourceClaimManager claimManager; + private final ResourceClaimManager claimManager; private final String id; private final String container; private final String section; @@ -25,7 +25,7 @@ public class StandardResourceClaim implements ResourceClaim, Comparable<Resource private final int hashCode; private volatile boolean writable = true; - public StandardResourceClaim(final StandardResourceClaimManager claimManager, final String container, final String section, final String id, final boolean lossTolerant) { + public StandardResourceClaim(final ResourceClaimManager claimManager, final String container, final String section, final String id, final boolean lossTolerant) { this.claimManager = claimManager; this.container = container.intern(); this.section = section.intern(); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index def8e89..c186e54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -183,6 +183,13 @@ <nifi.cluster.flow.election.max.wait.time>5 mins</nifi.cluster.flow.election.max.wait.time> <nifi.cluster.flow.election.max.candidates /> + <!-- nifi.properties: cluster load balance properties --> + <nifi.cluster.load.balance.host></nifi.cluster.load.balance.host> + <nifi.cluster.load.balance.port>7430</nifi.cluster.load.balance.port> + <nifi.cluster.load.balance.connections.per.node>4</nifi.cluster.load.balance.connections.per.node> + <nifi.cluster.load.balance.max.thread.count>8</nifi.cluster.load.balance.max.thread.count> + <nifi.cluster.load.balance.comms.timeout>30 sec</nifi.cluster.load.balance.comms.timeout> + <nifi.cluster.request.replication.claim.timeout>15 secs</nifi.cluster.request.replication.claim.timeout> <!-- nifi.properties: zookeeper properties --> http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index c875bd9..a37e8b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -216,6 +216,13 @@ nifi.cluster.firewall.file=${nifi.cluster.firewall.file} nifi.cluster.flow.election.max.wait.time=${nifi.cluster.flow.election.max.wait.time} nifi.cluster.flow.election.max.candidates=${nifi.cluster.flow.election.max.candidates} +# cluster load balancing properties # +nifi.cluster.load.balance.host=${nifi.cluster.load.balance.host} +nifi.cluster.load.balance.port=${nifi.cluster.load.balance.port} +nifi.cluster.load.balance.connections.per.node=${nifi.cluster.load.balance.connections.per.node} +nifi.cluster.load.balance.max.thread.count=${nifi.cluster.load.balance.max.thread.count} +nifi.cluster.load.balance.comms.timeout=${nifi.cluster.load.balance.comms.timeout} + # zookeeper properties, used for cluster management # nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string} nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6c2c3bc..06625f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -88,7 +88,11 @@ import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileSummary; import org.apache.nifi.controller.queue.ListFlowFileState; import org.apache.nifi.controller.queue.ListFlowFileStatus; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.QueueDiagnostics; import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -165,6 +169,7 @@ import org.apache.nifi.web.api.dto.action.details.MoveDetailsDTO; import org.apache.nifi.web.api.dto.action.details.PurgeDetailsDTO; import org.apache.nifi.web.api.dto.diagnostics.ClassLoaderDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.GCDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.GarbageCollectionDiagnosticsDTO; @@ -173,7 +178,9 @@ import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.JVMFlowDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO; +import org.apache.nifi.web.api.dto.diagnostics.LocalQueuePartitionDTO; import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.RemoteQueuePartitionDTO; import org.apache.nifi.web.api.dto.diagnostics.RepositoryUsageDTO; import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO; import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO; @@ -669,11 +676,13 @@ public final class DtoFactory { dto.setDestination(createConnectableDto(connection.getDestination())); dto.setVersionedComponentId(connection.getVersionedComponentId().orElse(null)); - dto.setBackPressureObjectThreshold(connection.getFlowFileQueue().getBackPressureObjectThreshold()); - dto.setBackPressureDataSizeThreshold(connection.getFlowFileQueue().getBackPressureDataSizeThreshold()); - dto.setFlowFileExpiration(connection.getFlowFileQueue().getFlowFileExpiration()); + final FlowFileQueue flowFileQueue = connection.getFlowFileQueue(); + + dto.setBackPressureObjectThreshold(flowFileQueue.getBackPressureObjectThreshold()); + dto.setBackPressureDataSizeThreshold(flowFileQueue.getBackPressureDataSizeThreshold()); + dto.setFlowFileExpiration(flowFileQueue.getFlowFileExpiration()); dto.setPrioritizers(new ArrayList<String>()); - for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) { + for (final FlowFilePrioritizer comparator : flowFileQueue.getPriorities()) { dto.getPrioritizers().add(comparator.getClass().getCanonicalName()); } @@ -699,6 +708,19 @@ public final class DtoFactory { } } + final LoadBalanceStrategy loadBalanceStrategy = flowFileQueue.getLoadBalanceStrategy(); + dto.setLoadBalancePartitionAttribute(flowFileQueue.getPartitioningAttribute()); + dto.setLoadBalanceStrategy(loadBalanceStrategy.name()); + dto.setLoadBalanceCompression(flowFileQueue.getLoadBalanceCompression().name()); + + if (loadBalanceStrategy == LoadBalanceStrategy.DO_NOT_LOAD_BALANCE) { + dto.setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_NOT_CONFIGURED); + } else if (flowFileQueue.isActivelyLoadBalancing()) { + dto.setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_ACTIVE); + } else { + dto.setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_INACTIVE); + } + return dto; } @@ -3335,30 +3357,84 @@ public final class DtoFactory { return dto; } + private ConnectionDiagnosticsDTO createConnectionDiagnosticsDto(final Connection connection) { final ConnectionDiagnosticsDTO dto = new ConnectionDiagnosticsDTO(); dto.setConnection(createConnectionDto(connection)); + dto.setAggregateSnapshot(createConnectionDiagnosticsSnapshotDto(connection)); + return dto; + } + + private ConnectionDiagnosticsSnapshotDTO createConnectionDiagnosticsSnapshotDto(final Connection connection) { + final ConnectionDiagnosticsSnapshotDTO dto = new ConnectionDiagnosticsSnapshotDTO(); + + final QueueDiagnostics queueDiagnostics = connection.getFlowFileQueue().getQueueDiagnostics(); final FlowFileQueue queue = connection.getFlowFileQueue(); final QueueSize totalSize = queue.size(); dto.setTotalByteCount(totalSize.getByteCount()); dto.setTotalFlowFileCount(totalSize.getObjectCount()); - final QueueSize activeSize = queue.getActiveQueueSize(); + final LocalQueuePartitionDiagnostics localDiagnostics = queueDiagnostics.getLocalQueuePartitionDiagnostics(); + dto.setLocalQueuePartition(createLocalQueuePartitionDto(localDiagnostics)); + + final List<RemoteQueuePartitionDiagnostics> remoteDiagnostics = queueDiagnostics.getRemoteQueuePartitionDiagnostics(); + if (remoteDiagnostics != null) { + final List<RemoteQueuePartitionDTO> remoteDiagnosticsDtos = remoteDiagnostics.stream() + .map(this::createRemoteQueuePartitionDto) + .collect(Collectors.toList()); + + dto.setRemoteQueuePartitions(remoteDiagnosticsDtos); + } + + return dto; + } + + private LocalQueuePartitionDTO createLocalQueuePartitionDto(final LocalQueuePartitionDiagnostics queueDiagnostics) { + final LocalQueuePartitionDTO dto = new LocalQueuePartitionDTO(); + + final QueueSize activeSize = queueDiagnostics.getActiveQueueSize(); + dto.setActiveQueueByteCount(activeSize.getByteCount()); + dto.setActiveQueueFlowFileCount(activeSize.getObjectCount()); + + final QueueSize inFlightSize = queueDiagnostics.getUnacknowledgedQueueSize(); + dto.setInFlightByteCount(inFlightSize.getByteCount()); + dto.setInFlightFlowFileCount(inFlightSize.getObjectCount()); + + final QueueSize swapSize = queueDiagnostics.getSwapQueueSize(); + dto.setSwapByteCount(swapSize.getByteCount()); + dto.setSwapFlowFileCount(swapSize.getObjectCount()); + dto.setSwapFiles(queueDiagnostics.getSwapFileCount()); + + dto.setTotalByteCount(activeSize.getByteCount() + inFlightSize.getByteCount() + swapSize.getByteCount()); + dto.setTotalFlowFileCount(activeSize.getObjectCount() + inFlightSize.getObjectCount() + swapSize.getObjectCount()); + + dto.setAllActiveQueueFlowFilesPenalized(queueDiagnostics.isAllActiveFlowFilesPenalized()); + dto.setAnyActiveQueueFlowFilesPenalized(queueDiagnostics.isAnyActiveFlowFilePenalized()); + + return dto; + } + + private RemoteQueuePartitionDTO createRemoteQueuePartitionDto(final RemoteQueuePartitionDiagnostics queueDiagnostics) { + final RemoteQueuePartitionDTO dto = new RemoteQueuePartitionDTO(); + + dto.setNodeIdentifier(queueDiagnostics.getNodeIdentifier()); + + final QueueSize activeSize = queueDiagnostics.getActiveQueueSize(); dto.setActiveQueueByteCount(activeSize.getByteCount()); dto.setActiveQueueFlowFileCount(activeSize.getObjectCount()); - final QueueSize inFlightSize = queue.getUnacknowledgedQueueSize(); + final QueueSize inFlightSize = queueDiagnostics.getUnacknowledgedQueueSize(); dto.setInFlightByteCount(inFlightSize.getByteCount()); dto.setInFlightFlowFileCount(inFlightSize.getObjectCount()); - final QueueSize swapSize = queue.getSwapQueueSize(); + final QueueSize swapSize = queueDiagnostics.getSwapQueueSize(); dto.setSwapByteCount(swapSize.getByteCount()); dto.setSwapFlowFileCount(swapSize.getObjectCount()); + dto.setSwapFiles(queueDiagnostics.getSwapFileCount()); - dto.setSwapFiles(queue.getSwapFileCount()); - dto.setAllActiveQueueFlowFilesPenalized(queue.isAllActiveFlowFilesPenalized()); - dto.setAnyActiveQueueFlowFilesPenalized(queue.isAnyActiveFlowFilePenalized()); + dto.setTotalByteCount(activeSize.getByteCount() + inFlightSize.getByteCount() + swapSize.getByteCount()); + dto.setTotalFlowFileCount(activeSize.getObjectCount() + inFlightSize.getObjectCount() + swapSize.getObjectCount()); return dto; } @@ -3835,6 +3911,10 @@ public final class DtoFactory { copy.setzIndex(original.getzIndex()); copy.setLabelIndex(original.getLabelIndex()); copy.setBends(copy(original.getBends())); + copy.setLoadBalancePartitionAttribute(original.getLoadBalancePartitionAttribute()); + copy.setLoadBalanceStrategy(original.getLoadBalanceStrategy()); + copy.setLoadBalanceCompression(original.getLoadBalanceCompression()); + copy.setLoadBalanceStatus(original.getLoadBalanceStatus()); copy.setVersionedComponentId(original.getVersionedComponentId()); return copy; http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index bb4d2fe..062f151 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -25,6 +25,8 @@ import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.exception.ValidationException; @@ -178,6 +180,18 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO connection.getFlowFileQueue().setPriorities(newPrioritizers); } + final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy(); + final String loadBalancePartitionAttribute = connectionDTO.getLoadBalancePartitionAttribute(); + if (isNotNull(loadBalanceStrategyName)) { + final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName); + connection.getFlowFileQueue().setLoadBalanceStrategy(loadBalanceStrategy, loadBalancePartitionAttribute); + } + + final String loadBalanceCompressionName = connectionDTO.getLoadBalanceCompression(); + if (isNotNull(loadBalanceCompressionName)) { + connection.getFlowFileQueue().setLoadBalanceCompression(LoadBalanceCompression.valueOf(loadBalanceCompressionName)); + } + // update the connection state if (isNotNull(connectionDTO.getBends())) { final List<Position> bendPoints = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/pom.xml index 60fa7ba..0a81441 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml @@ -23,10 +23,10 @@ <packaging>pom</packaging> <description>NiFi: Framework Bundle</description> <properties> - <nifi.registry.version>0.2.0</nifi.registry.version> + <nifi.registry.version>0.3.0</nifi.registry.version> <jersey.version>2.26</jersey.version> <spring.version>4.3.10.RELEASE</spring.version> - <spring.security.version>4.2.4.RELEASE</spring.security.version> + <spring.security.version>4.2.4.RELEASE</spring.security.version> <jackson.version>2.9.7</jackson.version> </properties> <modules> @@ -265,8 +265,8 @@ <artifactId>quartz</artifactId> <version>2.2.1</version> <exclusions> - <!-- | Exclude the quartz 2.2.1 bundled version of c3p0 - because it is lgpl licensed | We also don't use the JDBC related features + <!-- | Exclude the quartz 2.2.1 bundled version of c3p0 + because it is lgpl licensed | We also don't use the JDBC related features of quartz for which the dependency would matter --> <exclusion> <groupId>c3p0</groupId> @@ -288,7 +288,7 @@ <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4.7</version> - </dependency> + </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> @@ -521,7 +521,7 @@ <artifactId>spring-core</artifactId> <version>${spring.version}</version> <exclusions> - <!-- <artifactId>jcl-over-slf4j</artifactId> is used + <!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies section --> <exclusion> <groupId>commons-logging</groupId>
