http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
new file mode 100644
index 0000000..d6beff3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.security.util.CertificateUtils;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+import java.util.zip.GZIPInputStream;
+
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_PROTOCOL_NEGOTIATION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CHECK_SPACE;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.QUEUE_FULL;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REQEUST_DIFFERENT_VERSION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SKIP_SPACE_CHECK;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SPACE_AVAILABLE;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.VERSION_ACCEPTED;
+
+public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardLoadBalanceProtocol.class);
+
+    private static final int SOCKET_CLOSED = -1;
+    private static final int NO_DATA_AVAILABLE = 0;
+
+    private final FlowFileRepository flowFileRepository;
+    private final ContentRepository contentRepository;
+    private final ProvenanceRepository provenanceRepository;
+    private final FlowController flowController;
+    private final LoadBalanceAuthorizer authorizer;
+
+    private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal<>();
+    private final AtomicLong lineageStartIndex = new AtomicLong(0L);
+
+    public StandardLoadBalanceProtocol(final FlowFileRepository 
flowFileRepository, final ContentRepository contentRepository, final 
ProvenanceRepository provenanceRepository,
+                                       final FlowController flowController, 
final LoadBalanceAuthorizer authorizer) {
+        this.flowFileRepository = flowFileRepository;
+        this.contentRepository = contentRepository;
+        this.provenanceRepository = provenanceRepository;
+        this.flowController = flowController;
+        this.authorizer = authorizer;
+    }
+
+
+    @Override
+    public void receiveFlowFiles(final Socket socket) throws IOException {
+        final InputStream in = new 
BufferedInputStream(socket.getInputStream());
+        final OutputStream out = new 
BufferedOutputStream(socket.getOutputStream());
+
+        String peerDescription = socket.getInetAddress().toString();
+        if (socket instanceof SSLSocket) {
+            final SSLSession sslSession = ((SSLSocket) socket).getSession();
+
+            final Set<String> certIdentities;
+            try {
+                certIdentities = getCertificateIdentities(sslSession);
+
+                final String dn = 
CertificateUtils.extractPeerDNFromSSLSocket(socket);
+                peerDescription = CertificateUtils.extractUsername(dn);
+            } catch (final CertificateException e) {
+                throw new IOException("Failed to extract Client Certificate", 
e);
+            }
+
+            logger.debug("Connection received from peer {}. Will perform 
authorization against Client Identities '{}'",
+                peerDescription, certIdentities);
+
+            authorizer.authorize(certIdentities);
+            logger.debug("Client Identities {} are authorized to load balance 
data", certIdentities);
+        }
+
+        final int version = negotiateProtocolVersion(in, out, peerDescription);
+
+        if (version == SOCKET_CLOSED) {
+            socket.close();
+            return;
+        }
+        if (version == NO_DATA_AVAILABLE) {
+            logger.debug("No data is available from {}", 
socket.getRemoteSocketAddress());
+            return;
+        }
+
+        receiveFlowFiles(in, out, peerDescription, version, 
socket.getInetAddress().getHostName());
+    }
+
+    private Set<String> getCertificateIdentities(final SSLSession sslSession) 
throws CertificateException, SSLPeerUnverifiedException {
+        final Certificate[] certs = sslSession.getPeerCertificates();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
+        }
+
+        final X509Certificate cert = 
CertificateUtils.convertAbstractX509Certificate(certs[0]);
+        cert.checkValidity();
+
+        final Set<String> identities = 
CertificateUtils.getSubjectAlternativeNames(cert).stream()
+                .map(CertificateUtils::extractUsername)
+                .collect(Collectors.toSet());
+
+        return identities;
+    }
+
+
+    protected int negotiateProtocolVersion(final InputStream in, final 
OutputStream out, final String peerDescription) throws IOException {
+        final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
+
+        for (int i=0;; i++) {
+            final int requestedVersion;
+            try {
+                requestedVersion = in.read();
+            } catch (final SocketTimeoutException ste) {
+                // If first iteration, then just consider this to indicate "no 
data available". Otherwise, we were truly expecting data.
+                if (i == 0) {
+                    logger.debug("SocketTimeoutException thrown when trying to 
negotiate Protocol Version");
+                    return NO_DATA_AVAILABLE;
+                }
+
+                throw ste;
+            }
+
+            if (requestedVersion < 0) {
+                logger.debug("Encountered End-of-File when receiving the the 
recommended Protocol Version. Returning -1 for the protocol version");
+                return -1;
+            }
+
+            final boolean supported = 
negotiator.isVersionSupported(requestedVersion);
+            if (supported) {
+                logger.debug("Peer {} requested version {} of the Load Balance 
Protocol. Accepting version.", peerDescription, requestedVersion);
+
+                out.write(VERSION_ACCEPTED);
+                out.flush();
+                return requestedVersion;
+            }
+
+            final Integer preferredVersion = 
negotiator.getPreferredVersion(requestedVersion);
+            if (preferredVersion == null) {
+                logger.debug("Peer {} requested version {} of the Load Balance 
Protocol. This version is not acceptable. Aborting communications.", 
peerDescription, requestedVersion);
+
+                out.write(ABORT_PROTOCOL_NEGOTIATION);
+                out.flush();
+                throw new IOException("Peer " + peerDescription + " requested 
that we use version " + requestedVersion
+                    + " of the Load Balance Protocol, but this version is 
unacceptable. Aborted communications.");
+            }
+
+            logger.debug("Peer {} requested version {} of the Load Balance 
Protocol. Requesting that peer change to version {} instead.", peerDescription, 
requestedVersion, preferredVersion);
+
+            out.write(REQEUST_DIFFERENT_VERSION);
+            out.write(preferredVersion);
+            out.flush();
+        }
+    }
+
+
+    protected void receiveFlowFiles(final InputStream in, final OutputStream 
out, final String peerDescription, final int protocolVersion, final String 
nodeName) throws IOException {
+        logger.debug("Receiving FlowFiles from {}", peerDescription);
+        final long startTimestamp = System.currentTimeMillis();
+
+        final Checksum checksum = new CRC32();
+        final InputStream checkedInput = new CheckedInputStream(in, checksum);
+
+        final DataInputStream dataIn = new DataInputStream(checkedInput);
+        final String connectionId = getConnectionID(dataIn, peerDescription);
+        if (connectionId == null) {
+            logger.debug("Received no Connection ID from Peer {}. Will 
consider receipt of FlowFiles complete", peerDescription);
+            return;
+        }
+
+        final Connection connection = 
flowController.getConnection(connectionId);
+        if (connection == null) {
+            logger.error("Attempted to receive FlowFiles from Peer {} for 
Connection with ID {} but no connection exists with that ID", peerDescription, 
connectionId);
+            throw new TransactionAbortedException("Attempted to receive 
FlowFiles from Peer " + peerDescription + " for Connection with ID " + 
connectionId + " but no Connection exists with that ID");
+        }
+
+        final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
+        if (!(flowFileQueue instanceof LoadBalancedFlowFileQueue)) {
+            throw new TransactionAbortedException("Attempted to receive 
FlowFiles from Peer " + peerDescription + " for Connection with ID " + 
connectionId + " but the Connection with that ID is " +
+                    "not configured to allow for Load Balancing");
+        }
+
+        final int spaceCheck = dataIn.read();
+        if (spaceCheck < 0) {
+            throw new EOFException("Expected to receive a request to determine 
whether or not space was available for Connection with ID " + connectionId + " 
from Peer " + peerDescription);
+        }
+
+        if (spaceCheck == CHECK_SPACE) {
+            if (flowFileQueue.isFull()) {
+                logger.debug("Received a 'Check Space' request from Peer {} 
for Connection with ID {}; responding with QUEUE_FULL", peerDescription, 
connectionId);
+                out.write(QUEUE_FULL);
+                out.flush();
+                return; // we're finished receiving flowfiles for now, and 
we'll restart the communication process.
+            } else {
+                logger.debug("Received a 'Check Space' request from Peer {} 
for Connection with ID {}; responding with SPACE_AVAILABLE", peerDescription, 
connectionId);
+                out.write(SPACE_AVAILABLE);
+                out.flush();
+            }
+        } else if (spaceCheck != SKIP_SPACE_CHECK) {
+            throw new TransactionAbortedException("Expected to receive a 
request to determine whether or not space was available for Connection with ID "
+                + connectionId + " from Peer " + peerDescription + " but 
instead received value " + spaceCheck);
+        }
+
+        final LoadBalanceCompression compression = 
connection.getFlowFileQueue().getLoadBalanceCompression();
+        logger.debug("Receiving FlowFiles from Peer {} for Connection {}; 
Compression = {}", peerDescription, connectionId, compression);
+
+        ContentClaim contentClaim = null;
+        final List<RemoteFlowFileRecord> flowFilesReceived = new ArrayList<>();
+        OutputStream contentClaimOut = null;
+        long claimOffset = 0L;
+
+        try {
+            try {
+                while (isMoreFlowFiles(dataIn, protocolVersion)) {
+                    if (contentClaim == null) {
+                        contentClaim = contentRepository.create(false);
+                        contentClaimOut = 
contentRepository.write(contentClaim);
+                    } else {
+                        
contentRepository.incrementClaimaintCount(contentClaim);
+                    }
+
+                    final RemoteFlowFileRecord flowFile;
+                    try {
+                        flowFile = receiveFlowFile(dataIn, contentClaimOut, 
contentClaim, claimOffset, protocolVersion, peerDescription, compression);
+                    } catch (final Exception e) {
+                        contentRepository.decrementClaimantCount(contentClaim);
+                        throw e;
+                    }
+
+                    flowFilesReceived.add(flowFile);
+
+                    claimOffset += flowFile.getFlowFile().getSize();
+                }
+            } finally {
+                if (contentClaimOut != null) {
+                    contentClaimOut.close();
+                }
+            }
+
+            verifyChecksum(checksum, in, out, peerDescription, 
flowFilesReceived.size());
+            completeTransaction(in, out, peerDescription, flowFilesReceived, 
nodeName, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) 
flowFileQueue);
+        } catch (final Exception e) {
+            // If any Exception occurs, we need to decrement the claimant 
counts for the Content Claims that we wrote to because
+            // they are no longer needed.
+            for (final RemoteFlowFileRecord remoteFlowFile : 
flowFilesReceived) {
+                
contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim());
+            }
+
+            throw e;
+        }
+
+        logger.debug("Successfully received {} FlowFiles from Peer {} to Load 
Balance for Connection {}", flowFilesReceived.size(), peerDescription, 
connectionId);
+    }
+
+    private void completeTransaction(final InputStream in, final OutputStream 
out, final String peerDescription, final List<RemoteFlowFileRecord> 
flowFilesReceived,
+                                     final String nodeName, final String 
connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue 
flowFileQueue) throws IOException {
+        final int completionIndicator = in.read();
+        if (completionIndicator < 0) {
+            throw new EOFException("Expected to receive a Transaction 
Completion Indicator from Peer " + peerDescription + " but encountered EOF");
+        }
+
+        if (completionIndicator == ABORT_TRANSACTION) {
+            throw new TransactionAbortedException("Peer " + peerDescription + 
" chose to Abort Load Balance Transaction");
+        }
+
+        if (completionIndicator != COMPLETE_TRANSACTION) {
+            logger.debug("Expected to receive Transaction Completion Indicator 
from Peer " + peerDescription + " but instead received a value of " + 
completionIndicator + ". Sending back an Abort " +
+                            "Transaction Flag.");
+            out.write(ABORT_TRANSACTION);
+            out.flush();
+            throw new IOException("Expected to receive Transaction Completion 
Indicator from Peer " + peerDescription + " but instead received a value of " + 
completionIndicator);
+        }
+
+        logger.debug("Received Complete Transaction indicator from Peer {}", 
peerDescription);
+        registerReceiveProvenanceEvents(flowFilesReceived, nodeName, 
connectionId, startTimestamp);
+        updateFlowFileRepository(flowFilesReceived, flowFileQueue);
+        transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);
+
+        out.write(CONFIRM_COMPLETE_TRANSACTION);
+        out.flush();
+    }
+
+    private void registerReceiveProvenanceEvents(final 
List<RemoteFlowFileRecord> flowFiles, final String nodeName, final String 
connectionId, final long startTimestamp) {
+        final long duration = System.currentTimeMillis() - startTimestamp;
+
+        final List<ProvenanceEventRecord> events = new 
ArrayList<>(flowFiles.size());
+        for (final RemoteFlowFileRecord remoteFlowFile : flowFiles) {
+            final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile();
+
+            final ProvenanceEventBuilder provenanceEventBuilder = new 
StandardProvenanceEventRecord.Builder()
+                    .fromFlowFile(flowFileRecord)
+                    .setEventType(ProvenanceEventType.RECEIVE)
+                    .setTransitUri("nifi://" + nodeName + "/loadbalance/" + 
connectionId)
+                    
.setSourceSystemFlowFileIdentifier(remoteFlowFile.getRemoteUuid())
+                    .setEventDuration(duration)
+                    .setComponentId(connectionId)
+                    .setComponentType("Load Balanced Connection");
+
+            final ContentClaim contentClaim = flowFileRecord.getContentClaim();
+            if (contentClaim != null) {
+                final ResourceClaim resourceClaim = 
contentClaim.getResourceClaim();
+                
provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(),
+                    contentClaim.getOffset() + 
flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize());
+            }
+
+            final ProvenanceEventRecord provenanceEvent = 
provenanceEventBuilder.build();
+            events.add(provenanceEvent);
+        }
+
+        provenanceRepository.registerEvents(events);
+    }
+
+    private void updateFlowFileRepository(final List<RemoteFlowFileRecord> 
flowFiles, final FlowFileQueue flowFileQueue) throws IOException {
+        final List<RepositoryRecord> repoRecords = flowFiles.stream()
+                .map(remoteFlowFile -> {
+                    final StandardRepositoryRecord record = new 
StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile());
+                    record.setDestination(flowFileQueue);
+                    return record;
+                })
+                .collect(Collectors.toList());
+        flowFileRepository.updateRepository(repoRecords);
+    }
+
+    private void transferFlowFilesToQueue(final List<RemoteFlowFileRecord> 
remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) {
+        final List<FlowFileRecord> flowFiles = 
remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList());
+        flowFileQueue.receiveFromPeer(flowFiles);
+    }
+
+    private void verifyChecksum(final Checksum checksum, final InputStream in, 
final OutputStream out, final String peerDescription, final int flowFileCount) 
throws IOException {
+        final long expectedChecksum = readChecksum(in);
+        if (checksum.getValue() == expectedChecksum) {
+            logger.debug("Checksum from Peer {} matched the checksum that was 
calculated. Writing confirmation.", peerDescription);
+            out.write(CONFIRM_CHECKSUM);
+            out.flush();
+        } else {
+            logger.error("Received {} FlowFiles from peer {} but the Checksum 
reported by the peer ({}) did not match the checksum that was calculated ({}). 
Will reject the transaction.",
+                    flowFileCount, peerDescription, expectedChecksum, 
checksum.getValue());
+            out.write(REJECT_CHECKSUM);
+            out.flush();
+            throw new TransactionAbortedException("Transaction with Peer " + 
peerDescription + " was aborted because the calculated checksum did not match 
the checksum provided by peer.");
+        }
+    }
+
+    private long readChecksum(final InputStream in) throws IOException {
+        final byte[] buffer = getDataBuffer();
+        StreamUtils.read(in, buffer,8 );
+        return ByteBuffer.wrap(buffer, 0, 8).getLong();
+    }
+
+    private byte[] getDataBuffer() {
+        byte[] buffer = dataBuffer.get();
+        if (buffer == null) {
+            buffer = new byte[65536 + 4096];
+            dataBuffer.set(buffer);
+        }
+
+        return buffer;
+    }
+
+    private String getConnectionID(final DataInputStream in, final String 
peerDescription) throws IOException {
+        try {
+            return in.readUTF();
+        } catch (final EOFException eof) {
+            logger.debug("Encountered EOFException when trying to receive 
Connection ID from Peer {}. Returning null for Connection ID", peerDescription);
+            return null;
+        }
+    }
+
+    private boolean isMoreFlowFiles(final DataInputStream in, final int 
protocolVersion) throws IOException {
+        final int indicator = in.read();
+        if (indicator < 0) {
+            throw new EOFException();
+        }
+
+        if (indicator == MORE_FLOWFILES) {
+            logger.debug("Peer indicates that there is another FlowFile in 
transaction");
+            return true;
+        }
+        if (indicator == NO_MORE_FLOWFILES) {
+            logger.debug("Peer indicates that there are no more FlowFiles in 
transaction");
+            return false;
+        }
+
+        throw new IOException("Expected to receive 'More FlowFiles' indicator 
(" + MORE_FLOWFILES
+            + ") or 'No More FlowFiles' indicator (" + NO_MORE_FLOWFILES + ") 
but received invalid value of " + indicator);
+    }
+
+    private RemoteFlowFileRecord receiveFlowFile(final DataInputStream dis, 
final OutputStream out, final ContentClaim contentClaim, final long 
claimOffset, final int protocolVersion,
+                                                 final String peerDescription, 
final LoadBalanceCompression compression) throws IOException {
+        final int metadataLength = dis.readInt();
+
+        DataInputStream metadataIn = new DataInputStream(new 
LimitingInputStream(dis, metadataLength));
+        if (compression != LoadBalanceCompression.DO_NOT_COMPRESS) {
+            metadataIn = new DataInputStream(new GZIPInputStream(metadataIn));
+        }
+
+        final Map<String, String> attributes = readAttributes(metadataIn);
+
+        logger.debug("Received Attributes {} from Peer {}", attributes, 
peerDescription);
+
+        final long lineageStartDate = metadataIn.readLong();
+        final long entryDate = metadataIn.readLong();
+
+        final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, 
contentClaim, claimOffset, peerDescription, compression == 
LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
+
+        final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+            .id(flowFileRepository.getNextFlowFileSequence())
+            .addAttributes(attributes)
+            .contentClaim(contentClaimTriple.getContentClaim())
+            .contentClaimOffset(contentClaimTriple.getClaimOffset())
+            .size(contentClaimTriple.getContentLength())
+            .entryDate(entryDate)
+            .lineageStart(lineageStartDate, 
lineageStartIndex.getAndIncrement())
+            .build();
+
+        logger.debug("Received FlowFile {} with {} attributes and {} bytes of 
content", flowFileRecord, attributes.size(), 
contentClaimTriple.getContentLength());
+        return new 
RemoteFlowFileRecord(attributes.get(CoreAttributes.UUID.key()), flowFileRecord);
+    }
+
+    private Map<String, String> readAttributes(final DataInputStream in) 
throws IOException {
+        final int attributeCount = in.readInt();
+        final Map<String, String> attributes = new HashMap<>();
+        for (int i = 0; i < attributeCount; i++) {
+            final String key = readLongString(in);
+            final String value = readLongString(in);
+
+            logger.trace("Received attribute '{}' = '{}'", key, value);
+            attributes.put(key, value);
+        }
+
+        return attributes;
+    }
+
+    private String readLongString(final DataInputStream in) throws IOException 
{
+        final int stringLength = in.readInt();
+        final byte[] bytes = new byte[stringLength];
+        StreamUtils.fillBuffer(in, bytes);
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+
+    private ContentClaimTriple consumeContent(final DataInputStream in, final 
OutputStream out, final ContentClaim contentClaim, final long claimOffset,
+                                              final String peerDescription, 
final boolean compressed) throws IOException {
+        logger.debug("Consuming content from Peer {}", peerDescription);
+
+        int dataFrameIndicator = in.read();
+        if (dataFrameIndicator < 0) {
+            throw new EOFException("Encountered End-of-File when expecting to 
read Data Frame Indicator from Peer " + peerDescription);
+        }
+        if (dataFrameIndicator == NO_DATA_FRAME) {
+            logger.debug("Peer {} indicates that there is no Data Frame for 
the FlowFile", peerDescription);
+            return new ContentClaimTriple(null, 0L, 0L);
+        }
+        if (dataFrameIndicator == ABORT_TRANSACTION) {
+            throw new TransactionAbortedException("Peer " + peerDescription + 
" requested that transaction be aborted");
+        }
+        if (dataFrameIndicator != DATA_FRAME_FOLLOWS) {
+            throw new IOException("Expected a Data Frame Indicator from Peer " 
+ peerDescription + " but received a value of " + dataFrameIndicator);
+        }
+
+        int dataFrameLength = in.readUnsignedShort();
+        logger.trace("Received Data Frame Length of {} for {}", 
dataFrameLength, peerDescription);
+
+        byte[] buffer = getDataBuffer();
+
+        long claimLength = 0;
+        while (true) {
+            final InputStream limitedIn = new LimitedInputStream(in, 
dataFrameLength);
+            final ByteCountingInputStream bcis = new 
ByteCountingInputStream(limitedIn);
+            final InputStream contentIn = compressed ? new 
GZIPInputStream(bcis) : bcis;
+            final int decompressedSize = StreamUtils.fillBuffer(contentIn, 
buffer, false);
+
+            if (bcis.getBytesRead() < dataFrameLength) {
+                throw new EOFException("Expected to receive a Data Frame of 
length " + dataFrameLength + " bytes but received only " + bcis.getBytesRead() 
+ " bytes");
+            }
+
+            out.write(buffer, 0, decompressedSize);
+
+            claimLength += decompressedSize;
+
+            dataFrameIndicator = in.read();
+            if (dataFrameIndicator < 0) {
+                throw new EOFException("Encountered End-of-File when expecting 
to receive a Data Frame Indicator");
+            }
+            if (dataFrameIndicator == NO_DATA_FRAME) {
+                logger.debug("Peer {} indicated that no more data frames are 
available", peerDescription);
+                break;
+            }
+            if (dataFrameIndicator == ABORT_TRANSACTION) {
+                logger.debug("Peer {} requested that transaction be aborted by 
sending Data Frame Length of {}", peerDescription, dataFrameLength);
+                throw new TransactionAbortedException("Peer " + 
peerDescription + " requested that transaction be aborted");
+            }
+            if (dataFrameIndicator != DATA_FRAME_FOLLOWS) {
+                throw new IOException("Expected a Data Frame Indicator from 
Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
+            }
+
+            dataFrameLength = in.readUnsignedShort();
+            logger.trace("Received Data Frame Length of {} for {}", 
dataFrameLength, peerDescription);
+        }
+
+        return new ContentClaimTriple(contentClaim, claimOffset, claimLength);
+    }
+
+    private static class ContentClaimTriple {
+        private final ContentClaim contentClaim;
+        private final long claimOffset;
+        private final long contentLength;
+
+        public ContentClaimTriple(ContentClaim contentClaim, long claimOffset, 
long contentLength) {
+            this.contentClaim = contentClaim;
+            this.claimOffset = claimOffset;
+            this.contentLength = contentLength;
+        }
+
+        public ContentClaim getContentClaim() {
+            return contentClaim;
+        }
+
+        public long getClaimOffset() {
+            return claimOffset;
+        }
+
+        public long getContentLength() {
+            return contentLength;
+        }
+    }
+
+    private static class RemoteFlowFileRecord {
+        private final String remoteUuid;
+        private final FlowFileRecord flowFile;
+
+        public RemoteFlowFileRecord(final String remoteUuid, final 
FlowFileRecord flowFile) {
+            this.remoteUuid = remoteUuid;
+            this.flowFile = flowFile;
+        }
+
+        public String getRemoteUuid() {
+            return remoteUuid;
+        }
+
+        public FlowFileRecord getFlowFile() {
+            return flowFile;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java
new file mode 100644
index 0000000..43fd4c3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import java.io.IOException;
+
+public class TransactionAbortedException extends IOException {
+    public TransactionAbortedException(final String message) {
+        super(message);
+    }
+
+    public TransactionAbortedException(final String message, final Throwable 
cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
index a407371..ecf5046 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java
@@ -32,9 +32,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.util.Connectables;
 
-/**
- *
- */
 public class RepositoryContext {
 
     private final Connectable connectable;

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 9741cff..3496795 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -482,6 +482,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 LOG.debug(timingInfo.toString());
             }
         } catch (final Exception e) {
+            LOG.error("Failed to commit session {}. Will roll back.", e, this);
+
             try {
                 // if we fail to commit the session, we need to roll back
                 // the checkpoints as well because none of the checkpoints
@@ -1164,11 +1166,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private void acknowledgeRecords() {
-        for (final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry : 
unacknowledgedFlowFiles.entrySet()) {
-            LOG.trace("Acknowledging {} for {}", entry.getValue(), 
entry.getKey());
+        final Iterator<Map.Entry<FlowFileQueue, Set<FlowFileRecord>>> itr = 
unacknowledgedFlowFiles.entrySet().iterator();
+        while (itr.hasNext()) {
+            final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry = 
itr.next();
+            itr.remove();
+
             entry.getKey().acknowledge(entry.getValue());
         }
-        unacknowledgedFlowFiles.clear();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index e95845a..0a570c3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.controller.serialization;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceState;
@@ -48,12 +40,21 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 public class FlowFromDOMFactory {
     private static final Logger logger = 
LoggerFactory.getLogger(FlowFromDOMFactory.class);
 
@@ -295,6 +296,10 @@ public class FlowFromDOMFactory {
         }
         dto.setPrioritizers(prioritizerClasses);
 
+        dto.setLoadBalanceStrategy(getString(element, "loadBalanceStrategy"));
+        dto.setLoadBalancePartitionAttribute(getString(element, 
"partitioningAttribute"));
+        dto.setLoadBalanceCompression(getString(element, 
"loadBalanceCompression"));
+
         return dto;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index c3d0c1f..597c8fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.controller.serialization;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.ConnectableType;
@@ -70,6 +51,24 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.TransformerFactoryConfigurationError;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Serializes a Flow Controller as XML to an output stream.
  *
@@ -556,6 +555,10 @@ public class StandardFlowSerializer implements 
FlowSerializer<Document> {
             addTextElement(element, "queuePrioritizerClass", className);
         }
 
+        addTextElement(element, "loadBalanceStrategy", 
connection.getFlowFileQueue().getLoadBalanceStrategy().name());
+        addTextElement(element, "partitioningAttribute", 
connection.getFlowFileQueue().getPartitioningAttribute());
+        addTextElement(element, "loadBalanceCompression", 
connection.getFlowFileQueue().getLoadBalanceCompression().name());
+
         parentElement.appendChild(element);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index b365de2..d3248b5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -17,17 +17,6 @@
 
 package org.apache.nifi.controller.state.manager;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.net.ssl.SSLContext;
-
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.bundle.Bundle;
@@ -57,9 +46,21 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class StandardStateManagerProvider implements StateManagerProvider{
     private static final Logger logger = 
LoggerFactory.getLogger(StandardStateManagerProvider.class);
 
+    private static StateManagerProvider provider;
+
     private final ConcurrentMap<String, StateManager> stateManagers = new 
ConcurrentHashMap<>();
     private final StateProvider localStateProvider;
     private final StateProvider clusterStateProvider;
@@ -69,7 +70,11 @@ public class StandardStateManagerProvider implements 
StateManagerProvider{
         this.clusterStateProvider = clusterStateProvider;
     }
 
-    public static StateManagerProvider create(final NiFiProperties properties, 
final VariableRegistry variableRegistry) throws ConfigParseException, 
IOException {
+    public static synchronized StateManagerProvider create(final 
NiFiProperties properties, final VariableRegistry variableRegistry) throws 
ConfigParseException, IOException {
+        if (provider != null) {
+            return provider;
+        }
+
         final StateProvider localProvider = 
createLocalStateProvider(properties,variableRegistry);
 
         final StateProvider clusterProvider;
@@ -79,7 +84,8 @@ public class StandardStateManagerProvider implements 
StateManagerProvider{
             clusterProvider = null;
         }
 
-        return new StandardStateManagerProvider(localProvider, 
clusterProvider);
+        provider = new StandardStateManagerProvider(localProvider, 
clusterProvider);
+        return provider;
     }
 
     private static StateProvider createLocalStateProvider(final NiFiProperties 
properties, final VariableRegistry variableRegistry) throws IOException, 
ConfigParseException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index e1846a0..7da5702 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.fingerprint;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.stream.Stream;
-
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
@@ -57,6 +38,24 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
 /**
  * <p>Creates a fingerprint of a flow.xml. The order of elements or attributes 
in the flow.xml does not influence the fingerprint generation.
  *
@@ -606,6 +605,10 @@ public class FingerprintFactory {
 
         appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(connectionElem, "name"));
 
+        appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(connectionElem, "loadBalanceStrategy"));
+        appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(connectionElem, "partitioningAttribute"));
+        appendFirstValue(builder, 
DomUtils.getChildNodesByTagName(connectionElem, "loadBalanceCompression"));
+
         // relationships
         final NodeList relationshipElems = 
DomUtils.getChildNodesByTagName(connectionElem, "relationship");
         final List<Element> sortedRelationshipElems = 
sortElements(relationshipElems, getConnectionRelationshipsComparator());

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index e173cc6..2fee764 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -54,6 +54,8 @@ import 
org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -473,6 +475,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             rpg.shutdown();
         }
 
+        for (final Connection connection : procGroup.getConnections()) {
+            connection.getFlowFileQueue().stopLoadBalancing();
+        }
+
         // Recursively shutdown child groups.
         for (final ProcessGroup group : procGroup.getProcessGroups()) {
             shutdown(group);
@@ -957,7 +963,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     public Collection<ProcessorNode> getProcessors() {
         readLock.lock();
         try {
-            return new HashSet<>(processors.values());
+            return new ArrayList<>(processors.values());
         } finally {
             readLock.unlock();
         }
@@ -1112,6 +1118,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             connectionToRemove.verifyCanDelete();
 
+            connectionToRemove.getFlowFileQueue().stopLoadBalancing();
+
             final Connectable source = connectionToRemove.getSource();
             final Connectable dest = connectionToRemove.getDestination();
 
@@ -3863,6 +3871,23 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             .collect(Collectors.toList());
 
         queue.setPriorities(prioritizers);
+
+        final String loadBalanceStrategyName = 
proposed.getLoadBalanceStrategy();
+        if (loadBalanceStrategyName == null) {
+            
queue.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, 
proposed.getPartitioningAttribute());
+        } else {
+            final LoadBalanceStrategy loadBalanceStrategy = 
LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
+            final String partitioningAttribute = 
proposed.getPartitioningAttribute();
+
+            queue.setLoadBalanceStrategy(loadBalanceStrategy, 
partitioningAttribute);
+        }
+
+        final String compressionName = proposed.getLoadBalanceCompression();
+        if (compressionName == null) {
+            
queue.setLoadBalanceCompression(LoadBalanceCompression.DO_NOT_COMPRESS);
+        } else {
+            
queue.setLoadBalanceCompression(LoadBalanceCompression.valueOf(compressionName));
+        }
     }
 
     private Connection addConnection(final ProcessGroup destinationGroup, 
final VersionedConnection proposed, final String componentIdSeed) {
@@ -3884,6 +3909,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         destinationGroup.addConnection(connection);
         updateConnection(connection, proposed);
 
+        flowController.onConnectionAdded(connection);
         return connection;
     }
 
@@ -4470,7 +4496,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 }
             }
 
-            // Ensure that all Prioritizers are instantiate-able.
+            // Ensure that all Prioritizers are instantiate-able and that any 
load balancing configuration is correct
             final Map<String, VersionedConnection> proposedConnections = new 
HashMap<>();
             findAllConnections(updatedFlow.getFlowContents(), 
proposedConnections);
 
@@ -4488,6 +4514,16 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                         }
                     }
                 }
+
+                final String loadBalanceStrategyName = 
connectionToAdd.getLoadBalanceStrategy();
+                if (loadBalanceStrategyName != null) {
+                    try {
+                        LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
+                    } catch (final IllegalArgumentException iae) {
+                        throw new IllegalArgumentException("Unable to create 
Connection with Load Balance Strategy of '" + loadBalanceStrategyName
+                                + "' because this is not a known Load Balance 
Strategy");
+                    }
+                }
             }
         } finally {
             readLock.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index 8d6e5e3..243e725 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.registry.flow.mapping;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -73,6 +60,19 @@ import 
org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
 import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
 import org.apache.nifi.remote.RemoteGroupPort;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 
 public class NiFiRegistryFlowMapper {
     // We need to keep a mapping of component id to versionedComponentId as we 
transform these objects. This way, when
@@ -247,6 +247,10 @@ public class NiFiRegistryFlowMapper {
         
versionedConnection.setSelectedRelationships(connection.getRelationships().stream().map(Relationship::getName).collect(Collectors.toSet()));
         versionedConnection.setzIndex(connection.getZIndex());
 
+        final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
+        
versionedConnection.setLoadBalanceStrategy(flowFileQueue.getLoadBalanceStrategy().name());
+        
versionedConnection.setPartitioningAttribute(flowFileQueue.getPartitioningAttribute());
+
         versionedConnection.setBends(connection.getBendPoints().stream()
             .map(this::mapPosition)
             .collect(Collectors.toList()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index 9e81d22..3149ee1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -30,22 +30,22 @@
 
             <!-- Groupings of Processors/Ports -->
             <xs:element name="rootGroup" type="RootProcessGroupType" />
-            
+
             <!-- This exists for backward compatibility between NiFi 1.x and 
NiFi 0.x. Any Controller Service that is listed
             here is assigned to the root group -->
             <xs:element name="controllerServices" 
type="ControllerServicesType" minOccurs="0" maxOccurs="1" />
-            
+
             <xs:element name="reportingTasks" type="ReportingTasksType" 
minOccurs="0" maxOccurs="1" />
         </xs:sequence>
         <xs:attribute name="encoding-version" type="xs:string"/>
     </xs:complexType>
-       
+
     <xs:complexType name="RegistriesType">
         <xs:sequence>
             <xs:element name="flowRegistry" type="FlowRegistryType" 
minOccurs="0" maxOccurs="unbounded" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="FlowRegistryType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -54,7 +54,7 @@
             <xs:element name="description" type="xs:string" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <!-- the processor "id" is a key that should be valid within each 
flowController-->
     <xs:complexType name="ProcessorType">
         <xs:sequence>
@@ -80,21 +80,21 @@
             <xs:element name="bundle" type="BundleType" />
 
             <!-- the number of concurrent tasks for this configured
-            processor that can be executed at any one time. This value can be 
0 
+            processor that can be executed at any one time. This value can be 0
             IFF schedulingStrategy is EVENT_DRIVEN -->
             <xs:element name="maxConcurrentTasks" 
type="xs:nonNegativeInteger"/>
 
             <xs:element name="schedulingPeriod" type="NonEmptyStringType"/>
-            
+
             <xs:element name="penalizationPeriod" type="TimePeriod" />
-            
+
             <xs:element name="yieldPeriod" type="TimePeriod" />
-            
+
             <xs:element name="bulletinLevel" type="LogLevel" />
 
             <!-- whether or not this processor is loss-tolerant -->
             <xs:element name="lossTolerant" type="xs:boolean" />
-            
+
             <xs:element name="scheduledState" type="ScheduledState" />
 
             <!-- "isolated" is deprecated.
@@ -116,12 +116,12 @@
 
             <!-- Annotation data used for more advanced configuration -->
             <xs:element name="annotationData" type="xs:string" minOccurs="0" 
maxOccurs="1" />
-            
+
             <!-- Indicates that a relationship with the given name can be 
auto-terminated -->
             <xs:element name="autoTerminatedRelationship" type="xs:string" 
minOccurs="0" maxOccurs="unbounded" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <!-- The "name" should be a key within each processor-->
     <xs:complexType name="PropertyType">
         <xs:sequence>
@@ -130,13 +130,13 @@
             <xs:element name="value" type="xs:string" minOccurs="0" 
maxOccurs="1"/>
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:simpleType name="NonEmptyStringType">
         <xs:restriction base="xs:string">
             <xs:minLength value="1"/>
         </xs:restriction>
     </xs:simpleType>
-    
+
     <xs:simpleType name="ScheduledState">
         <xs:restriction base="xs:string">
             <xs:enumeration value="DISABLED"></xs:enumeration>
@@ -144,7 +144,7 @@
             <xs:enumeration value="STOPPED"></xs:enumeration>
         </xs:restriction>
     </xs:simpleType>
-    
+
     <xs:complexType name="ProcessGroupType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -153,13 +153,13 @@
             <xs:element name="position" type="PositionType" />
             <xs:element name="comment" type="xs:string" />
                <xs:element name="versionControlInformation" 
type="VersionControlInformation" minOccurs="0" maxOccurs="1" />
-            
+
             <!-- Each "processor" defines the actual dataflow work horses that 
make dataflow happen-->
             <xs:element name="processor" type="ProcessorType" minOccurs="0" 
maxOccurs="unbounded"/>
 
             <xs:element name="inputPort" type="PortType" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="outputPort" type="PortType" minOccurs="0" 
maxOccurs="unbounded"/>
-            
+
             <xs:element name="label" type="LabelType" minOccurs="0" 
maxOccurs="unbounded" />
             <xs:element name="funnel" type="FunnelType" minOccurs="0" 
maxOccurs="unbounded" />
             <xs:element name="processGroup" type="ProcessGroupType" 
minOccurs="0" maxOccurs="unbounded"        />
@@ -170,12 +170,12 @@
             <xs:element name="variable" type="VariableType" minOccurs="0" 
maxOccurs="unbounded"  />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="VariableType">
         <xs:attribute name="name" />
         <xs:attribute name="value" />
     </xs:complexType>
-    
+
     <xs:complexType name="VersionControlInformation">
         <xs:sequence>
             <xs:element name="registryId" type="NonEmptyStringType" />
@@ -187,7 +187,7 @@
             <xs:element name="version" type="NonEmptyStringType" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <!-- Same as ProcessGroupType except that instead of input ports & output 
ports being of type PortType,
     they are of type RootGroupPortType -->
     <xs:complexType name="RootProcessGroupType">
@@ -197,13 +197,13 @@
             <xs:element name="name" type="NonEmptyStringType" />
             <xs:element name="position" type="PositionType" />
             <xs:element name="comment" type="xs:string" />
-               
+
             <!-- Each "processor" defines the actual dataflow work horses that 
make dataflow happen-->
             <xs:element name="processor" type="ProcessorType" minOccurs="0" 
maxOccurs="unbounded"/>
 
             <xs:element name="inputPort" type="RootGroupPortType" 
minOccurs="0" maxOccurs="unbounded"/>
             <xs:element name="outputPort" type="RootGroupPortType" 
minOccurs="0" maxOccurs="unbounded"/>
-            
+
             <xs:element name="label" type="LabelType" minOccurs="0" 
maxOccurs="unbounded" />
             <xs:element name="funnel" type="FunnelType" minOccurs="0" 
maxOccurs="unbounded" />
             <xs:element name="processGroup" type="ProcessGroupType" 
minOccurs="0" maxOccurs="unbounded"        />
@@ -213,7 +213,7 @@
             <xs:element name="template" type="TemplateType" minOccurs="0" 
maxOccurs="unbounded" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="FunnelType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -221,7 +221,7 @@
             <xs:element name="position" type="PositionType" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="LabelType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -232,7 +232,7 @@
             <xs:element name="value" type="xs:string" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="RemoteProcessGroupType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -259,7 +259,7 @@
             <xs:element name="outputPort" type="RemoteGroupPortType" 
minOccurs="0" maxOccurs="unbounded" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="ConnectionType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -276,11 +276,11 @@
             <xs:element name="destinationType" type="NonEmptyStringType" />
             <!-- relationship will be an empty string for Ports. -->
             <xs:element name="relationship" type="xs:string" minOccurs="1" 
maxOccurs="unbounded" />
-            
+
             <!-- "maxWorkQueueSize" is the maximum size this processors work 
queue should be
             before other processors are expected (not required) to stop 
loading new files onto it.-->
             <xs:element name="maxWorkQueueSize" type="xs:nonNegativeInteger"/>
-            
+
             <xs:element name="maxWorkQueueDataSize" type="DataSize" 
minOccurs="0" maxOccurs="1" />
 
             <!-- "flowFileExpirationMinutes" is the maximum time that a flow 
file may remain
@@ -293,9 +293,13 @@
             <!-- "queuePrioritizerClass" are Java classes that can be used to 
prioritize the work queues for this
             processor.  The order of the prioritizers is important.-->
             <xs:element name="queuePrioritizerClass" type="xs:string" 
minOccurs="0" maxOccurs="unbounded"/>
+
+            <xs:element name="loadBalanceStrategy" type="xs:string" 
minOccurs="0" maxOccurs="1" />
+            <xs:element name="partitioningAttribute" type="xs:string" 
minOccurs="0" maxOccurs="1" />
+            <xs:element name="loadBalanceCompression" type="xs:string" 
minOccurs="0" maxOccurs="1" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="PortType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -306,7 +310,7 @@
             <xs:element name="scheduledState" type="ScheduledState" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="RootGroupPortType">
         <xs:complexContent>
             <xs:extension base="PortType">
@@ -318,7 +322,7 @@
             </xs:extension>
         </xs:complexContent>
     </xs:complexType>
-    
+
     <xs:complexType name="RemoteGroupPortType">
         <xs:complexContent>
             <xs:extension base="PortType">
@@ -346,12 +350,12 @@
         <xs:attribute name="x" type="xs:double" use="required" />
         <xs:attribute name="y" type="xs:double" use="required" />
     </xs:complexType>
-       
+
     <xs:complexType name="SizeType">
         <xs:attribute name="width" type="xs:double" use="required" />
         <xs:attribute name="height" type="xs:double" use="required" />
     </xs:complexType>
-       
+
 
     <xs:complexType name="BendPointsType">
         <xs:sequence>
@@ -378,13 +382,13 @@
             <xs:pattern 
value="\d+\s*(ns|nano|nanos|nanosecond|nanoseconds|ms|milli|millis|millisecond|milliseconds|s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days|w|wk|wks|week|weeks)"></xs:pattern>
         </xs:restriction>
     </xs:simpleType>
-       
+
     <xs:simpleType name="DataSize">
         <xs:restriction base="xs:string">
             <xs:pattern value="\d+\s*(B|KB|MB|GB|TB|b|kb|mb|gb|tb)" />
         </xs:restriction>
     </xs:simpleType>
-       
+
     <xs:simpleType name="LogLevel">
         <xs:restriction base="xs:string">
             <xs:enumeration value="TRACE"></xs:enumeration>
@@ -396,7 +400,7 @@
             <xs:enumeration value="NONE"></xs:enumeration>
         </xs:restriction>
     </xs:simpleType>
-       
+
     <xs:simpleType name="SchedulingStrategy">
         <xs:restriction base="xs:string">
             <xs:enumeration value="EVENT_DRIVEN"></xs:enumeration>
@@ -405,7 +409,7 @@
             <xs:enumeration value="CRON_DRIVEN"></xs:enumeration>
         </xs:restriction>
     </xs:simpleType>
-    
+
     <xs:simpleType name="ExecutionNode">
         <xs:restriction base="xs:string">
             <xs:enumeration value="ALL"></xs:enumeration>
@@ -418,7 +422,7 @@
             <xs:element name="controllerService" type="ControllerServiceType" 
minOccurs="0" maxOccurs="unbounded" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="ControllerServiceType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -428,18 +432,18 @@
             <xs:element name="class" type="NonEmptyStringType" />
             <xs:element name="bundle" type="BundleType" />
             <xs:element name="enabled" type="xs:boolean" />
-               
+
             <xs:element name="property" type="PropertyType" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="annotationData" type="xs:string" minOccurs="0" 
maxOccurs="1" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="ReportingTasksType">
         <xs:sequence>
             <xs:element name="reportingTask" type="ReportingTaskType" 
minOccurs="0" maxOccurs="unbounded" />
         </xs:sequence>
     </xs:complexType>
-    
+
     <xs:complexType name="ReportingTaskType">
         <xs:sequence>
             <xs:element name="id" type="NonEmptyStringType" />
@@ -450,7 +454,7 @@
             <xs:element name="schedulingPeriod" type="NonEmptyStringType"/>
             <xs:element name="scheduledState" type="ScheduledState" />
             <xs:element name="schedulingStrategy" type="SchedulingStrategy" />
-               
+
             <xs:element name="property" type="PropertyType" minOccurs="0" 
maxOccurs="unbounded"/>
             <xs:element name="annotationData" type="xs:string" minOccurs="0" 
maxOccurs="1" />
         </xs:sequence>

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
new file mode 100644
index 0000000..541cc7b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class MockFlowFileRecord implements FlowFileRecord {
+    private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+    private final long id = idGenerator.getAndIncrement();
+    private final long entryDate = System.currentTimeMillis();
+    private final Map<String, String> attributes;
+    private final long size;
+    private final ContentClaim contentClaim;
+
+    public MockFlowFileRecord() {
+        this(1L);
+    }
+
+    public MockFlowFileRecord(final long size) {
+        this(new HashMap<>(), size);
+    }
+
+    public MockFlowFileRecord(final Map<String, String> attributes, final long 
size) {
+        this(attributes, size, null);
+    }
+
+    public MockFlowFileRecord(final Map<String, String> attributes, final long 
size, final ContentClaim contentClaim) {
+        this.attributes = attributes;
+        this.size = size;
+        this.contentClaim = contentClaim;
+
+        if (!attributes.containsKey(CoreAttributes.UUID.key())) {
+            attributes.put(CoreAttributes.UUID.key(), createFakeUUID());
+        }
+    }
+
+    public static void resetIdGenerator() {
+        idGenerator.set(0L);
+    }
+
+    private String createFakeUUID() {
+        final String s = Long.toHexString(id);
+        return new 
StringBuffer("00000000-0000-0000-0000000000000000".substring(0, (35 - 
s.length())) + s).insert(23, '-').toString();
+    }
+
+    @Override
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public long getEntryDate() {
+        return entryDate;
+    }
+
+    @Override
+    public long getLineageStartDate() {
+        return entryDate;
+    }
+
+    @Override
+    public Long getLastQueueDate() {
+        return null;
+    }
+
+    @Override
+    public boolean isPenalized() {
+        return false;
+    }
+
+    @Override
+    public String getAttribute(String key) {
+        return attributes.get(key);
+    }
+
+    @Override
+    public long getSize() {
+        return size;
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        return Collections.unmodifiableMap(attributes);
+    }
+
+    @Override
+    public int compareTo(final FlowFile o) {
+        return Long.compare(id, o.getId());
+    }
+
+    @Override
+    public long getPenaltyExpirationMillis() {
+        return 0;
+    }
+
+    @Override
+    public ContentClaim getContentClaim() {
+        return contentClaim;
+    }
+
+    @Override
+    public long getContentClaimOffset() {
+        return 0;
+    }
+
+    @Override
+    public long getLineageStartIndex() {
+        return 0;
+    }
+
+    @Override
+    public long getQueueDateIndex() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
new file mode 100644
index 0000000..33b71f0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.swap.StandardSwapContents;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
+
+public class MockSwapManager implements FlowFileSwapManager {
+    public final Map<String, List<FlowFileRecord>> swappedOut = new 
HashMap<>();
+    public int swapOutCalledCount = 0;
+    public int swapInCalledCount = 0;
+
+    public int incompleteSwapFileRecordsToInclude = -1;
+
+    public int failSwapInAfterN = -1;
+    public Throwable failSwapInFailure = null;
+
+    public void setSwapInFailure(final Throwable t) {
+        this.failSwapInFailure = t;
+    }
+
+    @Override
+    public void initialize(final SwapManagerInitializationContext 
initializationContext) {
+
+    }
+
+    public void enableIncompleteSwapFileException(final int 
flowFilesToInclude) {
+        incompleteSwapFileRecordsToInclude = flowFilesToInclude;
+    }
+
+    @Override
+    public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue 
flowFileQueue, final String partitionName) throws IOException {
+        swapOutCalledCount++;
+        final String location = UUID.randomUUID().toString() + "." + 
partitionName;
+        swappedOut.put(location, new ArrayList<>(flowFiles));
+        return location;
+    }
+
+    private void throwIncompleteIfNecessary(final String swapLocation, final 
boolean remove) throws IOException {
+        if (incompleteSwapFileRecordsToInclude > -1) {
+            final SwapSummary summary = getSwapSummary(swapLocation);
+
+            final List<FlowFileRecord> records;
+            if (remove) {
+                records = swappedOut.remove(swapLocation);
+            } else {
+                records = swappedOut.get(swapLocation);
+            }
+
+            final List<FlowFileRecord> partial = records.subList(0, 
incompleteSwapFileRecordsToInclude);
+            final SwapContents partialContents = new 
StandardSwapContents(summary, partial);
+            throw new IncompleteSwapFileException(swapLocation, 
partialContents);
+        }
+
+        if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) {
+            if (failSwapInFailure instanceof RuntimeException) {
+                throw (RuntimeException) failSwapInFailure;
+            }
+            if (failSwapInFailure instanceof Error) {
+                throw (Error) failSwapInFailure;
+            }
+
+            throw new RuntimeException(failSwapInFailure);
+        }
+    }
+
+    @Override
+    public SwapContents peek(String swapLocation, final FlowFileQueue 
flowFileQueue) throws IOException {
+        throwIncompleteIfNecessary(swapLocation, false);
+        return new StandardSwapContents(getSwapSummary(swapLocation), 
swappedOut.get(swapLocation));
+    }
+
+    @Override
+    public SwapContents swapIn(String swapLocation, FlowFileQueue 
flowFileQueue) throws IOException {
+        swapInCalledCount++;
+        throwIncompleteIfNecessary(swapLocation, true);
+        return new StandardSwapContents(getSwapSummary(swapLocation), 
swappedOut.remove(swapLocation));
+    }
+
+    @Override
+    public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, 
final String partitionName) throws IOException {
+        return swappedOut.keySet().stream()
+            .filter(key -> key.endsWith("." + partitionName))
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public SwapSummary getSwapSummary(String swapLocation) throws IOException {
+        final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+        if (flowFiles == null) {
+            return StandardSwapSummary.EMPTY_SUMMARY;
+        }
+
+        int count = 0;
+        long size = 0L;
+        Long max = null;
+        final List<ResourceClaim> resourceClaims = new ArrayList<>();
+        for (final FlowFileRecord flowFile : flowFiles) {
+            count++;
+            size += flowFile.getSize();
+            if (max == null || flowFile.getId() > max) {
+                max = flowFile.getId();
+            }
+
+            if (flowFile.getContentClaim() != null) {
+                
resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
+            }
+        }
+
+        return new StandardSwapSummary(new QueueSize(count, size), max, 
resourceClaims);
+    }
+
+    @Override
+    public void purge() {
+        swappedOut.clear();
+    }
+
+    @Override
+    public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) 
throws IOException {
+        return swappedOut.keySet().stream()
+            .filter(key -> key.contains("."))
+            .map(key -> key.substring(key.indexOf(".") + 1))
+            .collect(Collectors.toCollection(HashSet::new));
+    }
+
+    @Override
+    public String changePartitionName(final String swapLocation, final String 
newPartitionName) throws IOException {
+        final List<FlowFileRecord> flowFiles = swappedOut.remove(swapLocation);
+        if (flowFiles == null) {
+            throw new IOException("Could not find swapfile with name " + 
swapLocation);
+        }
+
+        final String newSwapLocation;
+        final int dotIndex = swapLocation.indexOf(".");
+        if (dotIndex < 0) {
+            newSwapLocation = swapLocation + "." + newPartitionName;
+        } else {
+            newSwapLocation = swapLocation.substring(0, dotIndex) + "." + 
newPartitionName;
+        }
+
+        swappedOut.put(newSwapLocation, flowFiles);
+        return newSwapLocation;
+    }
+}

Reply via email to