Repository: nifi
Updated Branches:
  refs/heads/master 2201f7746 -> c7ff2fc5d


NIFI-5746: Use Node Identifier's node address instead of getting from socket 
for RECEIVE prov events; make SEND prov events match syntax of RECEIVE prov 
events

NIFI-5746: Addressed issue found in review process

This closes #3109.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c7ff2fc5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c7ff2fc5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c7ff2fc5

Branch: refs/heads/master
Commit: c7ff2fc5dba7c8aaeae07f4819c320e2a96555f0
Parents: 2201f77
Author: Mark Payne <[email protected]>
Authored: Wed Oct 24 13:45:55 2018 -0400
Committer: Koji Kawamura <[email protected]>
Committed: Fri Oct 26 10:24:12 2018 +0900

----------------------------------------------------------------------
 .../client/async/TransactionCompleteCallback.java |  3 ++-
 .../async/nio/NioAsyncLoadBalanceClient.java      |  2 +-
 .../clustered/partition/RemoteQueuePartition.java | 15 ++++++++-------
 .../server/ClusterLoadBalanceAuthorizer.java      |  6 +++---
 .../clustered/server/LoadBalanceAuthorizer.java   | 10 +++++++++-
 .../server/StandardLoadBalanceProtocol.java       | 17 +++++++----------
 .../queue/clustered/LoadBalancedQueueIT.java      |  4 ++--
 .../client/async/nio/TestLoadBalanceSession.java  |  4 ++--
 .../server/TestStandardLoadBalanceProtocol.java   | 18 +++++++++---------
 9 files changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
index 0c8f8b6..6e327cd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/TransactionCompleteCallback.java
@@ -17,10 +17,11 @@
 
 package org.apache.nifi.controller.queue.clustered.client.async;
 
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 
 import java.util.List;
 
 public interface TransactionCompleteCallback {
-    void onTransactionComplete(List<FlowFileRecord> flowFilesSent);
+    void onTransactionComplete(List<FlowFileRecord> flowFilesSent, 
NodeIdentifier nodeIdentifier);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 753c1f4..e55dfcd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -256,7 +256,7 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
             } while (success);
 
             if (loadBalanceSession.isComplete()) {
-                
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent());
+                
loadBalanceSession.getPartition().getSuccessCallback().onTransactionComplete(loadBalanceSession.getFlowFilesSent(),
 nodeIdentifier);
             }
 
             return anySuccess;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index a78de55..854a3a5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -173,7 +173,8 @@ public class RemoteQueuePartition implements QueuePartition 
{
                         final StandardRepositoryRecord repoRecord = new 
StandardRepositoryRecord(flowFileQueue, flowFile);
                         repoRecord.markForAbort();
 
-                        updateRepositories(Collections.emptyList(), 
Collections.singleton(repoRecord));
+                        // We can send 'null' for the node identifier only 
because the list of FlowFiles sent is empty.
+                        updateRepositories(Collections.emptyList(), 
Collections.singleton(repoRecord), null);
 
                         // If unable to even connect to the node, go ahead and 
transfer all FlowFiles for this queue to the failure destination.
                         // In either case, transfer those FlowFiles that we 
failed to send.
@@ -204,12 +205,12 @@ public class RemoteQueuePartition implements 
QueuePartition {
 
         final TransactionCompleteCallback successCallback = new 
TransactionCompleteCallback() {
             @Override
-            public void onTransactionComplete(final List<FlowFileRecord> 
flowFilesSent) {
+            public void onTransactionComplete(final List<FlowFileRecord> 
flowFilesSent, final NodeIdentifier nodeIdentifier) {
                 // We've now completed the transaction. We must now update the 
repositories and "keep the books", acknowledging the FlowFiles
                 // with the queue so that its size remains accurate.
                 priorityQueue.acknowledge(flowFilesSent);
                 flowFileQueue.onTransfer(flowFilesSent);
-                updateRepositories(flowFilesSent, Collections.emptyList());
+                updateRepositories(flowFilesSent, Collections.emptyList(), 
nodeIdentifier);
             }
         };
 
@@ -230,7 +231,7 @@ public class RemoteQueuePartition implements QueuePartition 
{
      * @param flowFilesSent the FlowFiles that were sent to another node.
      * @param abortedRecords the Repository Records for any FlowFile whose 
content was missing.
      */
-    private void updateRepositories(final List<FlowFileRecord> flowFilesSent, 
final Collection<RepositoryRecord> abortedRecords) {
+    private void updateRepositories(final List<FlowFileRecord> flowFilesSent, 
final Collection<RepositoryRecord> abortedRecords, final NodeIdentifier 
nodeIdentifier) {
         // We update the Provenance Repository first. This way, even if we 
restart before we update the FlowFile repo, we have the record
         // that the data was sent in the Provenance Repository. We then update 
the content claims and finally the FlowFile Repository. We do it
         // in this order so that when the FlowFile repo is sync'ed to disk, we 
know which Content Claims are no longer in use. Updating the FlowFile
@@ -242,7 +243,7 @@ public class RemoteQueuePartition implements QueuePartition 
{
         // are ever created.
         final List<ProvenanceEventRecord> provenanceEvents = new 
ArrayList<>(flowFilesSent.size() * 2 + abortedRecords.size());
         for (final FlowFileRecord sent : flowFilesSent) {
-            provenanceEvents.add(createSendEvent(sent));
+            provenanceEvents.add(createSendEvent(sent, nodeIdentifier));
             provenanceEvents.add(createDropEvent(sent));
         }
 
@@ -279,7 +280,7 @@ public class RemoteQueuePartition implements QueuePartition 
{
         return record;
     }
 
-    private ProvenanceEventRecord createSendEvent(final FlowFileRecord 
flowFile) {
+    private ProvenanceEventRecord createSendEvent(final FlowFileRecord 
flowFile, final NodeIdentifier nodeIdentifier) {
 
         final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder()
                 .fromFlowFile(flowFile)
@@ -289,7 +290,7 @@ public class RemoteQueuePartition implements QueuePartition 
{
                 .setComponentType("Connection")
                 .setSourceQueueIdentifier(flowFileQueue.getIdentifier())
                 
.setSourceSystemFlowFileIdentifier(flowFile.getAttribute(CoreAttributes.UUID.key()))
-                .setTransitUri("nifi:connection:" + 
flowFileQueue.getIdentifier());
+                .setTransitUri("nifi://" + nodeIdentifier.getApiAddress() + 
"/loadbalance/" + flowFileQueue.getIdentifier());
 
         final ContentClaim contentClaim = flowFile.getContentClaim();
         if (contentClaim != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
index 43187b5..f0d51c8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
@@ -40,10 +40,10 @@ public class ClusterLoadBalanceAuthorizer implements 
LoadBalanceAuthorizer {
     }
 
     @Override
-    public void authorize(final Collection<String> clientIdentities) throws 
NotAuthorizedException {
+    public String authorize(final Collection<String> clientIdentities) throws 
NotAuthorizedException {
         if (clientIdentities == null) {
             logger.debug("Client Identities is null, so assuming that Load 
Balancing communications are not secure. Authorizing client to participate in 
Load Balancing");
-            return;
+            return null;
         }
 
         final Set<String> nodeIds = 
clusterCoordinator.getNodeIdentifiers().stream()
@@ -53,7 +53,7 @@ public class ClusterLoadBalanceAuthorizer implements 
LoadBalanceAuthorizer {
         for (final String clientId : clientIdentities) {
             if (nodeIds.contains(clientId)) {
                 logger.debug("Client ID '{}' is in the list of Nodes in the 
Cluster. Authorizing Client to Load Balance data", clientId);
-                return;
+                return clientId;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
index 3a716e2..3abd328 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
@@ -20,5 +20,13 @@ package org.apache.nifi.controller.queue.clustered.server;
 import java.util.Collection;
 
 public interface LoadBalanceAuthorizer {
-    void authorize(Collection<String> clientIdentities) throws 
NotAuthorizedException;
+    /**
+     * Checks if any of the given identities is allowed to load balance data. 
If so, the identity that has been
+     * permitted is returned. If not, a NotAuthorizedException is thrown.
+     *
+     * @param clientIdentities the collection of identities to check
+     * @return the identity that is authorized, or null if the given 
collection of identities is null
+     * @throws NotAuthorizedException if none of the given identities is 
authorized to load balance data
+     */
+    String authorize(Collection<String> clientIdentities) throws 
NotAuthorizedException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 0f032df..dda71de 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -123,16 +123,13 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
         final InputStream in = new 
BufferedInputStream(socket.getInputStream());
         final OutputStream out = new 
BufferedOutputStream(socket.getOutputStream());
 
-        String peerDescription = socket.getInetAddress().toString();
+        String peerDescription = socket.getInetAddress().getHostName();
         if (socket instanceof SSLSocket) {
             final SSLSession sslSession = ((SSLSocket) socket).getSession();
 
             final Set<String> certIdentities;
             try {
                 certIdentities = getCertificateIdentities(sslSession);
-
-                final String dn = 
CertificateUtils.extractPeerDNFromSSLSocket(socket);
-                peerDescription = CertificateUtils.extractUsername(dn);
             } catch (final CertificateException e) {
                 throw new IOException("Failed to extract Client Certificate", 
e);
             }
@@ -140,7 +137,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
             logger.debug("Connection received from peer {}. Will perform 
authorization against Client Identities '{}'",
                 peerDescription, certIdentities);
 
-            authorizer.authorize(certIdentities);
+            peerDescription = authorizer.authorize(certIdentities);
             logger.debug("Client Identities {} are authorized to load balance 
data", certIdentities);
         }
 
@@ -155,7 +152,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
             return;
         }
 
-        receiveFlowFiles(in, out, peerDescription, version, 
socket.getInetAddress().getHostName());
+        receiveFlowFiles(in, out, peerDescription, version);
     }
 
     private Set<String> getCertificateIdentities(final SSLSession sslSession) 
throws CertificateException, SSLPeerUnverifiedException {
@@ -225,7 +222,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
     }
 
 
-    protected void receiveFlowFiles(final InputStream in, final OutputStream 
out, final String peerDescription, final int protocolVersion, final String 
nodeName) throws IOException {
+    protected void receiveFlowFiles(final InputStream in, final OutputStream 
out, final String peerDescription, final int protocolVersion) throws 
IOException {
         logger.debug("Receiving FlowFiles from {}", peerDescription);
         final long startTimestamp = System.currentTimeMillis();
 
@@ -309,7 +306,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
             }
 
             verifyChecksum(checksum, in, out, peerDescription, 
flowFilesReceived.size());
-            completeTransaction(in, out, peerDescription, flowFilesReceived, 
nodeName, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) 
flowFileQueue);
+            completeTransaction(in, out, peerDescription, flowFilesReceived, 
connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);
         } catch (final Exception e) {
             // If any Exception occurs, we need to decrement the claimant 
counts for the Content Claims that we wrote to because
             // they are no longer needed.
@@ -324,7 +321,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
     }
 
     private void completeTransaction(final InputStream in, final OutputStream 
out, final String peerDescription, final List<RemoteFlowFileRecord> 
flowFilesReceived,
-                                     final String nodeName, final String 
connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue 
flowFileQueue) throws IOException {
+                                     final String connectionId, final long 
startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws 
IOException {
         final int completionIndicator = in.read();
         if (completionIndicator < 0) {
             throw new EOFException("Expected to receive a Transaction 
Completion Indicator from Peer " + peerDescription + " but encountered EOF");
@@ -343,7 +340,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
         }
 
         logger.debug("Received Complete Transaction indicator from Peer {}", 
peerDescription);
-        registerReceiveProvenanceEvents(flowFilesReceived, nodeName, 
connectionId, startTimestamp);
+        registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, 
connectionId, startTimestamp);
         updateFlowFileRepository(flowFilesReceived, flowFileQueue);
         transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index 17e9237..4871d72 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -101,7 +101,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class LoadBalancedQueueIT {
-    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
+    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds 
== null ? null : nodeIds.iterator().next();
     private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> {
         throw new NotAuthorizedException("Intentional Unit Test Failure - Not 
Authorized");
     };
@@ -269,7 +269,7 @@ public class LoadBalancedQueueIT {
                 }
             }
 
-            final int totalFlowFileCount = 6;
+            final int totalFlowFileCount = 7;
 
             // Wait up to 10 seconds for the server's FlowFile Repository to 
be updated
             final long endTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(10L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index efa5d73..20b6add 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -122,7 +122,7 @@ public class TestLoadBalanceSession {
         final FlowFileContentAccess contentAccess = contentMap::get;
 
         final RegisteredPartition partition = new 
RegisteredPartition("unit-test-connection", () -> false,
-            flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> 
LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
+            flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> 
LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
 
         final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress("localhost", port));
 
@@ -209,7 +209,7 @@ public class TestLoadBalanceSession {
         final FlowFileContentAccess contentAccess = contentMap::get;
 
         final RegisteredPartition partition = new 
RegisteredPartition("unit-test-connection", () -> false,
-            flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> 
LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
+            flowFiles::poll, NOP_FAILURE_CALLBACK, (ff, nodeId) -> {}, () -> 
LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
 
         final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress("localhost", port));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7ff2fc5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index d020c12..94f992f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -78,7 +78,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 public class TestStandardLoadBalanceProtocol {
-    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
+    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> nodeIds 
== null ? null : nodeIds.iterator().next();
     private FlowFileRepository flowFileRepo;
     private ContentRepository contentRepo;
     private ProvenanceRepository provenanceRepo;
@@ -204,7 +204,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.writeLong(checksum.getValue());
         dos.write(COMPLETE_TRANSACTION);
 
-        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
 
         final byte[] serverResponse = serverOutput.toByteArray();
         assertEquals(3, serverResponse.length);
@@ -266,7 +266,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.writeLong(checksum.getValue());
         dos.write(COMPLETE_TRANSACTION);
 
-        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
 
         final byte[] serverResponse = serverOutput.toByteArray();
         assertEquals(3, serverResponse.length);
@@ -331,7 +331,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.writeLong(checksum.getValue());
         dos.write(COMPLETE_TRANSACTION);
 
-        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
 
         final byte[] serverResponse = serverOutput.toByteArray();
         assertEquals(2, serverResponse.length);
@@ -393,7 +393,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.close();
 
         try {
-            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1);
             Assert.fail("Expected EOFException but none was thrown");
         } catch (final EOFException eof) {
             // expected
@@ -440,7 +440,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.write(COMPLETE_TRANSACTION);
 
         try {
-            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1);
             Assert.fail("Expected TransactionAbortedException but none was 
thrown");
         } catch (final TransactionAbortedException e) {
             // expected
@@ -492,7 +492,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.close();
 
         try {
-            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1);
             Assert.fail("Expected EOFException but none was thrown");
         } catch (final EOFException e) {
             // expected
@@ -541,7 +541,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.write(ABORT_TRANSACTION);
 
         try {
-            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1);
             Assert.fail("Expected TransactionAbortedException but none was 
thrown");
         } catch (final TransactionAbortedException e) {
             // expected
@@ -590,7 +590,7 @@ public class TestStandardLoadBalanceProtocol {
         dos.writeLong(checksum.getValue());
         dos.write(COMPLETE_TRANSACTION);
 
-        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1);
 
         final byte[] serverResponse = serverOutput.toByteArray();
         assertEquals(3, serverResponse.length);

Reply via email to