http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java new file mode 100644 index 0000000..d6beff3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.StandardFlowFileRecord; +import org.apache.nifi.controller.repository.StandardRepositoryRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.security.util.CertificateUtils; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocket; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; +import java.util.zip.GZIPInputStream; + +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_PROTOCOL_NEGOTIATION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CHECK_SPACE; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.QUEUE_FULL; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REQEUST_DIFFERENT_VERSION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SKIP_SPACE_CHECK; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SPACE_AVAILABLE; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.VERSION_ACCEPTED; + +public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { + private static final Logger logger = LoggerFactory.getLogger(StandardLoadBalanceProtocol.class); + + private static final int SOCKET_CLOSED = -1; + private static final int NO_DATA_AVAILABLE = 0; + + private final FlowFileRepository flowFileRepository; + private final ContentRepository contentRepository; + private final ProvenanceRepository provenanceRepository; + private final FlowController flowController; + private final LoadBalanceAuthorizer authorizer; + + private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal<>(); + private final AtomicLong lineageStartIndex = new AtomicLong(0L); + + public StandardLoadBalanceProtocol(final FlowFileRepository flowFileRepository, final ContentRepository contentRepository, final ProvenanceRepository provenanceRepository, + final FlowController flowController, final LoadBalanceAuthorizer authorizer) { + this.flowFileRepository = flowFileRepository; + this.contentRepository = contentRepository; + this.provenanceRepository = provenanceRepository; + this.flowController = flowController; + this.authorizer = authorizer; + } + + + @Override + public void receiveFlowFiles(final Socket socket) throws IOException { + final InputStream in = new BufferedInputStream(socket.getInputStream()); + final OutputStream out = new BufferedOutputStream(socket.getOutputStream()); + + String peerDescription = socket.getInetAddress().toString(); + if (socket instanceof SSLSocket) { + final SSLSession sslSession = ((SSLSocket) socket).getSession(); + + final Set<String> certIdentities; + try { + certIdentities = getCertificateIdentities(sslSession); + + final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); + peerDescription = CertificateUtils.extractUsername(dn); + } catch (final CertificateException e) { + throw new IOException("Failed to extract Client Certificate", e); + } + + logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", + peerDescription, certIdentities); + + authorizer.authorize(certIdentities); + logger.debug("Client Identities {} are authorized to load balance data", certIdentities); + } + + final int version = negotiateProtocolVersion(in, out, peerDescription); + + if (version == SOCKET_CLOSED) { + socket.close(); + return; + } + if (version == NO_DATA_AVAILABLE) { + logger.debug("No data is available from {}", socket.getRemoteSocketAddress()); + return; + } + + receiveFlowFiles(in, out, peerDescription, version, socket.getInetAddress().getHostName()); + } + + private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException { + final Certificate[] certs = sslSession.getPeerCertificates(); + if (certs == null || certs.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); + } + + final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]); + cert.checkValidity(); + + final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream() + .map(CertificateUtils::extractUsername) + .collect(Collectors.toSet()); + + return identities; + } + + + protected int negotiateProtocolVersion(final InputStream in, final OutputStream out, final String peerDescription) throws IOException { + final VersionNegotiator negotiator = new StandardVersionNegotiator(1); + + for (int i=0;; i++) { + final int requestedVersion; + try { + requestedVersion = in.read(); + } catch (final SocketTimeoutException ste) { + // If first iteration, then just consider this to indicate "no data available". Otherwise, we were truly expecting data. + if (i == 0) { + logger.debug("SocketTimeoutException thrown when trying to negotiate Protocol Version"); + return NO_DATA_AVAILABLE; + } + + throw ste; + } + + if (requestedVersion < 0) { + logger.debug("Encountered End-of-File when receiving the the recommended Protocol Version. Returning -1 for the protocol version"); + return -1; + } + + final boolean supported = negotiator.isVersionSupported(requestedVersion); + if (supported) { + logger.debug("Peer {} requested version {} of the Load Balance Protocol. Accepting version.", peerDescription, requestedVersion); + + out.write(VERSION_ACCEPTED); + out.flush(); + return requestedVersion; + } + + final Integer preferredVersion = negotiator.getPreferredVersion(requestedVersion); + if (preferredVersion == null) { + logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", peerDescription, requestedVersion); + + out.write(ABORT_PROTOCOL_NEGOTIATION); + out.flush(); + throw new IOException("Peer " + peerDescription + " requested that we use version " + requestedVersion + + " of the Load Balance Protocol, but this version is unacceptable. Aborted communications."); + } + + logger.debug("Peer {} requested version {} of the Load Balance Protocol. Requesting that peer change to version {} instead.", peerDescription, requestedVersion, preferredVersion); + + out.write(REQEUST_DIFFERENT_VERSION); + out.write(preferredVersion); + out.flush(); + } + } + + + protected void receiveFlowFiles(final InputStream in, final OutputStream out, final String peerDescription, final int protocolVersion, final String nodeName) throws IOException { + logger.debug("Receiving FlowFiles from {}", peerDescription); + final long startTimestamp = System.currentTimeMillis(); + + final Checksum checksum = new CRC32(); + final InputStream checkedInput = new CheckedInputStream(in, checksum); + + final DataInputStream dataIn = new DataInputStream(checkedInput); + final String connectionId = getConnectionID(dataIn, peerDescription); + if (connectionId == null) { + logger.debug("Received no Connection ID from Peer {}. Will consider receipt of FlowFiles complete", peerDescription); + return; + } + + final Connection connection = flowController.getConnection(connectionId); + if (connection == null) { + logger.error("Attempted to receive FlowFiles from Peer {} for Connection with ID {} but no connection exists with that ID", peerDescription, connectionId); + throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but no Connection exists with that ID"); + } + + final FlowFileQueue flowFileQueue = connection.getFlowFileQueue(); + if (!(flowFileQueue instanceof LoadBalancedFlowFileQueue)) { + throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but the Connection with that ID is " + + "not configured to allow for Load Balancing"); + } + + final int spaceCheck = dataIn.read(); + if (spaceCheck < 0) { + throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription); + } + + if (spaceCheck == CHECK_SPACE) { + if (flowFileQueue.isFull()) { + logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId); + out.write(QUEUE_FULL); + out.flush(); + return; // we're finished receiving flowfiles for now, and we'll restart the communication process. + } else { + logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with SPACE_AVAILABLE", peerDescription, connectionId); + out.write(SPACE_AVAILABLE); + out.flush(); + } + } else if (spaceCheck != SKIP_SPACE_CHECK) { + throw new TransactionAbortedException("Expected to receive a request to determine whether or not space was available for Connection with ID " + + connectionId + " from Peer " + peerDescription + " but instead received value " + spaceCheck); + } + + final LoadBalanceCompression compression = connection.getFlowFileQueue().getLoadBalanceCompression(); + logger.debug("Receiving FlowFiles from Peer {} for Connection {}; Compression = {}", peerDescription, connectionId, compression); + + ContentClaim contentClaim = null; + final List<RemoteFlowFileRecord> flowFilesReceived = new ArrayList<>(); + OutputStream contentClaimOut = null; + long claimOffset = 0L; + + try { + try { + while (isMoreFlowFiles(dataIn, protocolVersion)) { + if (contentClaim == null) { + contentClaim = contentRepository.create(false); + contentClaimOut = contentRepository.write(contentClaim); + } else { + contentRepository.incrementClaimaintCount(contentClaim); + } + + final RemoteFlowFileRecord flowFile; + try { + flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression); + } catch (final Exception e) { + contentRepository.decrementClaimantCount(contentClaim); + throw e; + } + + flowFilesReceived.add(flowFile); + + claimOffset += flowFile.getFlowFile().getSize(); + } + } finally { + if (contentClaimOut != null) { + contentClaimOut.close(); + } + } + + verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size()); + completeTransaction(in, out, peerDescription, flowFilesReceived, nodeName, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue); + } catch (final Exception e) { + // If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because + // they are no longer needed. + for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) { + contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim()); + } + + throw e; + } + + logger.debug("Successfully received {} FlowFiles from Peer {} to Load Balance for Connection {}", flowFilesReceived.size(), peerDescription, connectionId); + } + + private void completeTransaction(final InputStream in, final OutputStream out, final String peerDescription, final List<RemoteFlowFileRecord> flowFilesReceived, + final String nodeName, final String connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws IOException { + final int completionIndicator = in.read(); + if (completionIndicator < 0) { + throw new EOFException("Expected to receive a Transaction Completion Indicator from Peer " + peerDescription + " but encountered EOF"); + } + + if (completionIndicator == ABORT_TRANSACTION) { + throw new TransactionAbortedException("Peer " + peerDescription + " chose to Abort Load Balance Transaction"); + } + + if (completionIndicator != COMPLETE_TRANSACTION) { + logger.debug("Expected to receive Transaction Completion Indicator from Peer " + peerDescription + " but instead received a value of " + completionIndicator + ". Sending back an Abort " + + "Transaction Flag."); + out.write(ABORT_TRANSACTION); + out.flush(); + throw new IOException("Expected to receive Transaction Completion Indicator from Peer " + peerDescription + " but instead received a value of " + completionIndicator); + } + + logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription); + registerReceiveProvenanceEvents(flowFilesReceived, nodeName, connectionId, startTimestamp); + updateFlowFileRepository(flowFilesReceived, flowFileQueue); + transferFlowFilesToQueue(flowFilesReceived, flowFileQueue); + + out.write(CONFIRM_COMPLETE_TRANSACTION); + out.flush(); + } + + private void registerReceiveProvenanceEvents(final List<RemoteFlowFileRecord> flowFiles, final String nodeName, final String connectionId, final long startTimestamp) { + final long duration = System.currentTimeMillis() - startTimestamp; + + final List<ProvenanceEventRecord> events = new ArrayList<>(flowFiles.size()); + for (final RemoteFlowFileRecord remoteFlowFile : flowFiles) { + final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile(); + + final ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder() + .fromFlowFile(flowFileRecord) + .setEventType(ProvenanceEventType.RECEIVE) + .setTransitUri("nifi://" + nodeName + "/loadbalance/" + connectionId) + .setSourceSystemFlowFileIdentifier(remoteFlowFile.getRemoteUuid()) + .setEventDuration(duration) + .setComponentId(connectionId) + .setComponentType("Load Balanced Connection"); + + final ContentClaim contentClaim = flowFileRecord.getContentClaim(); + if (contentClaim != null) { + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), + contentClaim.getOffset() + flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize()); + } + + final ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build(); + events.add(provenanceEvent); + } + + provenanceRepository.registerEvents(events); + } + + private void updateFlowFileRepository(final List<RemoteFlowFileRecord> flowFiles, final FlowFileQueue flowFileQueue) throws IOException { + final List<RepositoryRecord> repoRecords = flowFiles.stream() + .map(remoteFlowFile -> { + final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile()); + record.setDestination(flowFileQueue); + return record; + }) + .collect(Collectors.toList()); + flowFileRepository.updateRepository(repoRecords); + } + + private void transferFlowFilesToQueue(final List<RemoteFlowFileRecord> remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) { + final List<FlowFileRecord> flowFiles = remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList()); + flowFileQueue.receiveFromPeer(flowFiles); + } + + private void verifyChecksum(final Checksum checksum, final InputStream in, final OutputStream out, final String peerDescription, final int flowFileCount) throws IOException { + final long expectedChecksum = readChecksum(in); + if (checksum.getValue() == expectedChecksum) { + logger.debug("Checksum from Peer {} matched the checksum that was calculated. Writing confirmation.", peerDescription); + out.write(CONFIRM_CHECKSUM); + out.flush(); + } else { + logger.error("Received {} FlowFiles from peer {} but the Checksum reported by the peer ({}) did not match the checksum that was calculated ({}). Will reject the transaction.", + flowFileCount, peerDescription, expectedChecksum, checksum.getValue()); + out.write(REJECT_CHECKSUM); + out.flush(); + throw new TransactionAbortedException("Transaction with Peer " + peerDescription + " was aborted because the calculated checksum did not match the checksum provided by peer."); + } + } + + private long readChecksum(final InputStream in) throws IOException { + final byte[] buffer = getDataBuffer(); + StreamUtils.read(in, buffer,8 ); + return ByteBuffer.wrap(buffer, 0, 8).getLong(); + } + + private byte[] getDataBuffer() { + byte[] buffer = dataBuffer.get(); + if (buffer == null) { + buffer = new byte[65536 + 4096]; + dataBuffer.set(buffer); + } + + return buffer; + } + + private String getConnectionID(final DataInputStream in, final String peerDescription) throws IOException { + try { + return in.readUTF(); + } catch (final EOFException eof) { + logger.debug("Encountered EOFException when trying to receive Connection ID from Peer {}. Returning null for Connection ID", peerDescription); + return null; + } + } + + private boolean isMoreFlowFiles(final DataInputStream in, final int protocolVersion) throws IOException { + final int indicator = in.read(); + if (indicator < 0) { + throw new EOFException(); + } + + if (indicator == MORE_FLOWFILES) { + logger.debug("Peer indicates that there is another FlowFile in transaction"); + return true; + } + if (indicator == NO_MORE_FLOWFILES) { + logger.debug("Peer indicates that there are no more FlowFiles in transaction"); + return false; + } + + throw new IOException("Expected to receive 'More FlowFiles' indicator (" + MORE_FLOWFILES + + ") or 'No More FlowFiles' indicator (" + NO_MORE_FLOWFILES + ") but received invalid value of " + indicator); + } + + private RemoteFlowFileRecord receiveFlowFile(final DataInputStream dis, final OutputStream out, final ContentClaim contentClaim, final long claimOffset, final int protocolVersion, + final String peerDescription, final LoadBalanceCompression compression) throws IOException { + final int metadataLength = dis.readInt(); + + DataInputStream metadataIn = new DataInputStream(new LimitingInputStream(dis, metadataLength)); + if (compression != LoadBalanceCompression.DO_NOT_COMPRESS) { + metadataIn = new DataInputStream(new GZIPInputStream(metadataIn)); + } + + final Map<String, String> attributes = readAttributes(metadataIn); + + logger.debug("Received Attributes {} from Peer {}", attributes, peerDescription); + + final long lineageStartDate = metadataIn.readLong(); + final long entryDate = metadataIn.readLong(); + + final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(flowFileRepository.getNextFlowFileSequence()) + .addAttributes(attributes) + .contentClaim(contentClaimTriple.getContentClaim()) + .contentClaimOffset(contentClaimTriple.getClaimOffset()) + .size(contentClaimTriple.getContentLength()) + .entryDate(entryDate) + .lineageStart(lineageStartDate, lineageStartIndex.getAndIncrement()) + .build(); + + logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength()); + return new RemoteFlowFileRecord(attributes.get(CoreAttributes.UUID.key()), flowFileRecord); + } + + private Map<String, String> readAttributes(final DataInputStream in) throws IOException { + final int attributeCount = in.readInt(); + final Map<String, String> attributes = new HashMap<>(); + for (int i = 0; i < attributeCount; i++) { + final String key = readLongString(in); + final String value = readLongString(in); + + logger.trace("Received attribute '{}' = '{}'", key, value); + attributes.put(key, value); + } + + return attributes; + } + + private String readLongString(final DataInputStream in) throws IOException { + final int stringLength = in.readInt(); + final byte[] bytes = new byte[stringLength]; + StreamUtils.fillBuffer(in, bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + private ContentClaimTriple consumeContent(final DataInputStream in, final OutputStream out, final ContentClaim contentClaim, final long claimOffset, + final String peerDescription, final boolean compressed) throws IOException { + logger.debug("Consuming content from Peer {}", peerDescription); + + int dataFrameIndicator = in.read(); + if (dataFrameIndicator < 0) { + throw new EOFException("Encountered End-of-File when expecting to read Data Frame Indicator from Peer " + peerDescription); + } + if (dataFrameIndicator == NO_DATA_FRAME) { + logger.debug("Peer {} indicates that there is no Data Frame for the FlowFile", peerDescription); + return new ContentClaimTriple(null, 0L, 0L); + } + if (dataFrameIndicator == ABORT_TRANSACTION) { + throw new TransactionAbortedException("Peer " + peerDescription + " requested that transaction be aborted"); + } + if (dataFrameIndicator != DATA_FRAME_FOLLOWS) { + throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator); + } + + int dataFrameLength = in.readUnsignedShort(); + logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription); + + byte[] buffer = getDataBuffer(); + + long claimLength = 0; + while (true) { + final InputStream limitedIn = new LimitedInputStream(in, dataFrameLength); + final ByteCountingInputStream bcis = new ByteCountingInputStream(limitedIn); + final InputStream contentIn = compressed ? new GZIPInputStream(bcis) : bcis; + final int decompressedSize = StreamUtils.fillBuffer(contentIn, buffer, false); + + if (bcis.getBytesRead() < dataFrameLength) { + throw new EOFException("Expected to receive a Data Frame of length " + dataFrameLength + " bytes but received only " + bcis.getBytesRead() + " bytes"); + } + + out.write(buffer, 0, decompressedSize); + + claimLength += decompressedSize; + + dataFrameIndicator = in.read(); + if (dataFrameIndicator < 0) { + throw new EOFException("Encountered End-of-File when expecting to receive a Data Frame Indicator"); + } + if (dataFrameIndicator == NO_DATA_FRAME) { + logger.debug("Peer {} indicated that no more data frames are available", peerDescription); + break; + } + if (dataFrameIndicator == ABORT_TRANSACTION) { + logger.debug("Peer {} requested that transaction be aborted by sending Data Frame Length of {}", peerDescription, dataFrameLength); + throw new TransactionAbortedException("Peer " + peerDescription + " requested that transaction be aborted"); + } + if (dataFrameIndicator != DATA_FRAME_FOLLOWS) { + throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator); + } + + dataFrameLength = in.readUnsignedShort(); + logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription); + } + + return new ContentClaimTriple(contentClaim, claimOffset, claimLength); + } + + private static class ContentClaimTriple { + private final ContentClaim contentClaim; + private final long claimOffset; + private final long contentLength; + + public ContentClaimTriple(ContentClaim contentClaim, long claimOffset, long contentLength) { + this.contentClaim = contentClaim; + this.claimOffset = claimOffset; + this.contentLength = contentLength; + } + + public ContentClaim getContentClaim() { + return contentClaim; + } + + public long getClaimOffset() { + return claimOffset; + } + + public long getContentLength() { + return contentLength; + } + } + + private static class RemoteFlowFileRecord { + private final String remoteUuid; + private final FlowFileRecord flowFile; + + public RemoteFlowFileRecord(final String remoteUuid, final FlowFileRecord flowFile) { + this.remoteUuid = remoteUuid; + this.flowFile = flowFile; + } + + public String getRemoteUuid() { + return remoteUuid; + } + + public FlowFileRecord getFlowFile() { + return flowFile; + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java new file mode 100644 index 0000000..43fd4c3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/TransactionAbortedException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import java.io.IOException; + +public class TransactionAbortedException extends IOException { + public TransactionAbortedException(final String message) { + super(message); + } + + public TransactionAbortedException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java index a407371..ecf5046 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java @@ -32,9 +32,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.util.Connectables; -/** - * - */ public class RepositoryContext { private final Connectable connectable; http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 9741cff..3496795 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -482,6 +482,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE LOG.debug(timingInfo.toString()); } } catch (final Exception e) { + LOG.error("Failed to commit session {}. Will roll back.", e, this); + try { // if we fail to commit the session, we need to roll back // the checkpoints as well because none of the checkpoints @@ -1164,11 +1166,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private void acknowledgeRecords() { - for (final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry : unacknowledgedFlowFiles.entrySet()) { - LOG.trace("Acknowledging {} for {}", entry.getValue(), entry.getKey()); + final Iterator<Map.Entry<FlowFileQueue, Set<FlowFileRecord>>> itr = unacknowledgedFlowFiles.entrySet().iterator(); + while (itr.hasNext()) { + final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry = itr.next(); + itr.remove(); + entry.getKey().acknowledge(entry.getValue()); } - unacknowledgedFlowFiles.clear(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index e95845a..0a570c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.controller.serialization; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; @@ -48,12 +40,21 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.w3c.dom.Element; import org.w3c.dom.NodeList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + public class FlowFromDOMFactory { private static final Logger logger = LoggerFactory.getLogger(FlowFromDOMFactory.class); @@ -295,6 +296,10 @@ public class FlowFromDOMFactory { } dto.setPrioritizers(prioritizerClasses); + dto.setLoadBalanceStrategy(getString(element, "loadBalanceStrategy")); + dto.setLoadBalancePartitionAttribute(getString(element, "partitioningAttribute")); + dto.setLoadBalanceCompression(getString(element, "loadBalanceCompression")); + return dto; } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index c3d0c1f..597c8fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -16,25 +16,6 @@ */ package org.apache.nifi.controller.serialization; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.TransformerFactoryConfigurationError; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; - import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.ConnectableType; @@ -70,6 +51,24 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.TransformerFactoryConfigurationError; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + /** * Serializes a Flow Controller as XML to an output stream. * @@ -556,6 +555,10 @@ public class StandardFlowSerializer implements FlowSerializer<Document> { addTextElement(element, "queuePrioritizerClass", className); } + addTextElement(element, "loadBalanceStrategy", connection.getFlowFileQueue().getLoadBalanceStrategy().name()); + addTextElement(element, "partitioningAttribute", connection.getFlowFileQueue().getPartitioningAttribute()); + addTextElement(element, "loadBalanceCompression", connection.getFlowFileQueue().getLoadBalanceCompression().name()); + parentElement.appendChild(element); } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index b365de2..d3248b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -17,17 +17,6 @@ package org.apache.nifi.controller.state.manager; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.net.ssl.SSLContext; - import org.apache.commons.lang3.ArrayUtils; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.bundle.Bundle; @@ -57,9 +46,21 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class StandardStateManagerProvider implements StateManagerProvider{ private static final Logger logger = LoggerFactory.getLogger(StandardStateManagerProvider.class); + private static StateManagerProvider provider; + private final ConcurrentMap<String, StateManager> stateManagers = new ConcurrentHashMap<>(); private final StateProvider localStateProvider; private final StateProvider clusterStateProvider; @@ -69,7 +70,11 @@ public class StandardStateManagerProvider implements StateManagerProvider{ this.clusterStateProvider = clusterStateProvider; } - public static StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry) throws ConfigParseException, IOException { + public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry) throws ConfigParseException, IOException { + if (provider != null) { + return provider; + } + final StateProvider localProvider = createLocalStateProvider(properties,variableRegistry); final StateProvider clusterProvider; @@ -79,7 +84,8 @@ public class StandardStateManagerProvider implements StateManagerProvider{ clusterProvider = null; } - return new StandardStateManagerProvider(localProvider, clusterProvider); + provider = new StandardStateManagerProvider(localProvider, clusterProvider); + return provider; } private static StateProvider createLocalStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry) throws IOException, ConfigParseException { http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index e1846a0..7da5702 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -16,25 +16,6 @@ */ package org.apache.nifi.fingerprint; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.stream.Stream; - -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; @@ -57,6 +38,24 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Stream; + /** * <p>Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation. * @@ -606,6 +605,10 @@ public class FingerprintFactory { appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "name")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "loadBalanceStrategy")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "partitioningAttribute")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "loadBalanceCompression")); + // relationships final NodeList relationshipElems = DomUtils.getChildNodesByTagName(connectionElem, "relationship"); final List<Element> sortedRelationshipElems = sortElements(relationshipElems, getConnectionRelationshipsComparator()); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index e173cc6..2fee764 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -54,6 +54,8 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -473,6 +475,10 @@ public final class StandardProcessGroup implements ProcessGroup { rpg.shutdown(); } + for (final Connection connection : procGroup.getConnections()) { + connection.getFlowFileQueue().stopLoadBalancing(); + } + // Recursively shutdown child groups. for (final ProcessGroup group : procGroup.getProcessGroups()) { shutdown(group); @@ -957,7 +963,7 @@ public final class StandardProcessGroup implements ProcessGroup { public Collection<ProcessorNode> getProcessors() { readLock.lock(); try { - return new HashSet<>(processors.values()); + return new ArrayList<>(processors.values()); } finally { readLock.unlock(); } @@ -1112,6 +1118,8 @@ public final class StandardProcessGroup implements ProcessGroup { connectionToRemove.verifyCanDelete(); + connectionToRemove.getFlowFileQueue().stopLoadBalancing(); + final Connectable source = connectionToRemove.getSource(); final Connectable dest = connectionToRemove.getDestination(); @@ -3863,6 +3871,23 @@ public final class StandardProcessGroup implements ProcessGroup { .collect(Collectors.toList()); queue.setPriorities(prioritizers); + + final String loadBalanceStrategyName = proposed.getLoadBalanceStrategy(); + if (loadBalanceStrategyName == null) { + queue.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, proposed.getPartitioningAttribute()); + } else { + final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName); + final String partitioningAttribute = proposed.getPartitioningAttribute(); + + queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute); + } + + final String compressionName = proposed.getLoadBalanceCompression(); + if (compressionName == null) { + queue.setLoadBalanceCompression(LoadBalanceCompression.DO_NOT_COMPRESS); + } else { + queue.setLoadBalanceCompression(LoadBalanceCompression.valueOf(compressionName)); + } } private Connection addConnection(final ProcessGroup destinationGroup, final VersionedConnection proposed, final String componentIdSeed) { @@ -3884,6 +3909,7 @@ public final class StandardProcessGroup implements ProcessGroup { destinationGroup.addConnection(connection); updateConnection(connection, proposed); + flowController.onConnectionAdded(connection); return connection; } @@ -4470,7 +4496,7 @@ public final class StandardProcessGroup implements ProcessGroup { } } - // Ensure that all Prioritizers are instantiate-able. + // Ensure that all Prioritizers are instantiate-able and that any load balancing configuration is correct final Map<String, VersionedConnection> proposedConnections = new HashMap<>(); findAllConnections(updatedFlow.getFlowContents(), proposedConnections); @@ -4488,6 +4514,16 @@ public final class StandardProcessGroup implements ProcessGroup { } } } + + final String loadBalanceStrategyName = connectionToAdd.getLoadBalanceStrategy(); + if (loadBalanceStrategyName != null) { + try { + LoadBalanceStrategy.valueOf(loadBalanceStrategyName); + } catch (final IllegalArgumentException iae) { + throw new IllegalArgumentException("Unable to create Connection with Load Balance Strategy of '" + loadBalanceStrategyName + + "' because this is not a known Load Balance Strategy"); + } + } } } finally { readLock.unlock(); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 8d6e5e3..243e725 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -17,19 +17,6 @@ package org.apache.nifi.registry.flow.mapping; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import org.apache.commons.lang3.ClassUtils; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; @@ -73,6 +60,19 @@ import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + public class NiFiRegistryFlowMapper { // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when @@ -247,6 +247,10 @@ public class NiFiRegistryFlowMapper { versionedConnection.setSelectedRelationships(connection.getRelationships().stream().map(Relationship::getName).collect(Collectors.toSet())); versionedConnection.setzIndex(connection.getZIndex()); + final FlowFileQueue flowFileQueue = connection.getFlowFileQueue(); + versionedConnection.setLoadBalanceStrategy(flowFileQueue.getLoadBalanceStrategy().name()); + versionedConnection.setPartitioningAttribute(flowFileQueue.getPartitioningAttribute()); + versionedConnection.setBends(connection.getBendPoints().stream() .map(this::mapPosition) .collect(Collectors.toList())); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 9e81d22..3149ee1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -30,22 +30,22 @@ <!-- Groupings of Processors/Ports --> <xs:element name="rootGroup" type="RootProcessGroupType" /> - + <!-- This exists for backward compatibility between NiFi 1.x and NiFi 0.x. Any Controller Service that is listed here is assigned to the root group --> <xs:element name="controllerServices" type="ControllerServicesType" minOccurs="0" maxOccurs="1" /> - + <xs:element name="reportingTasks" type="ReportingTasksType" minOccurs="0" maxOccurs="1" /> </xs:sequence> <xs:attribute name="encoding-version" type="xs:string"/> </xs:complexType> - + <xs:complexType name="RegistriesType"> <xs:sequence> <xs:element name="flowRegistry" type="FlowRegistryType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="FlowRegistryType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -54,7 +54,7 @@ <xs:element name="description" type="xs:string" /> </xs:sequence> </xs:complexType> - + <!-- the processor "id" is a key that should be valid within each flowController--> <xs:complexType name="ProcessorType"> <xs:sequence> @@ -80,21 +80,21 @@ <xs:element name="bundle" type="BundleType" /> <!-- the number of concurrent tasks for this configured - processor that can be executed at any one time. This value can be 0 + processor that can be executed at any one time. This value can be 0 IFF schedulingStrategy is EVENT_DRIVEN --> <xs:element name="maxConcurrentTasks" type="xs:nonNegativeInteger"/> <xs:element name="schedulingPeriod" type="NonEmptyStringType"/> - + <xs:element name="penalizationPeriod" type="TimePeriod" /> - + <xs:element name="yieldPeriod" type="TimePeriod" /> - + <xs:element name="bulletinLevel" type="LogLevel" /> <!-- whether or not this processor is loss-tolerant --> <xs:element name="lossTolerant" type="xs:boolean" /> - + <xs:element name="scheduledState" type="ScheduledState" /> <!-- "isolated" is deprecated. @@ -116,12 +116,12 @@ <!-- Annotation data used for more advanced configuration --> <xs:element name="annotationData" type="xs:string" minOccurs="0" maxOccurs="1" /> - + <!-- Indicates that a relationship with the given name can be auto-terminated --> <xs:element name="autoTerminatedRelationship" type="xs:string" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <!-- The "name" should be a key within each processor--> <xs:complexType name="PropertyType"> <xs:sequence> @@ -130,13 +130,13 @@ <xs:element name="value" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> - + <xs:simpleType name="NonEmptyStringType"> <xs:restriction base="xs:string"> <xs:minLength value="1"/> </xs:restriction> </xs:simpleType> - + <xs:simpleType name="ScheduledState"> <xs:restriction base="xs:string"> <xs:enumeration value="DISABLED"></xs:enumeration> @@ -144,7 +144,7 @@ <xs:enumeration value="STOPPED"></xs:enumeration> </xs:restriction> </xs:simpleType> - + <xs:complexType name="ProcessGroupType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -153,13 +153,13 @@ <xs:element name="position" type="PositionType" /> <xs:element name="comment" type="xs:string" /> <xs:element name="versionControlInformation" type="VersionControlInformation" minOccurs="0" maxOccurs="1" /> - + <!-- Each "processor" defines the actual dataflow work horses that make dataflow happen--> <xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="inputPort" type="PortType" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="outputPort" type="PortType" minOccurs="0" maxOccurs="unbounded"/> - + <xs:element name="label" type="LabelType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="funnel" type="FunnelType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="processGroup" type="ProcessGroupType" minOccurs="0" maxOccurs="unbounded" /> @@ -170,12 +170,12 @@ <xs:element name="variable" type="VariableType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="VariableType"> <xs:attribute name="name" /> <xs:attribute name="value" /> </xs:complexType> - + <xs:complexType name="VersionControlInformation"> <xs:sequence> <xs:element name="registryId" type="NonEmptyStringType" /> @@ -187,7 +187,7 @@ <xs:element name="version" type="NonEmptyStringType" /> </xs:sequence> </xs:complexType> - + <!-- Same as ProcessGroupType except that instead of input ports & output ports being of type PortType, they are of type RootGroupPortType --> <xs:complexType name="RootProcessGroupType"> @@ -197,13 +197,13 @@ <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="position" type="PositionType" /> <xs:element name="comment" type="xs:string" /> - + <!-- Each "processor" defines the actual dataflow work horses that make dataflow happen--> <xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="inputPort" type="RootGroupPortType" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="outputPort" type="RootGroupPortType" minOccurs="0" maxOccurs="unbounded"/> - + <xs:element name="label" type="LabelType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="funnel" type="FunnelType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="processGroup" type="ProcessGroupType" minOccurs="0" maxOccurs="unbounded" /> @@ -213,7 +213,7 @@ <xs:element name="template" type="TemplateType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="FunnelType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -221,7 +221,7 @@ <xs:element name="position" type="PositionType" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="LabelType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -232,7 +232,7 @@ <xs:element name="value" type="xs:string" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="RemoteProcessGroupType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -259,7 +259,7 @@ <xs:element name="outputPort" type="RemoteGroupPortType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="ConnectionType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -276,11 +276,11 @@ <xs:element name="destinationType" type="NonEmptyStringType" /> <!-- relationship will be an empty string for Ports. --> <xs:element name="relationship" type="xs:string" minOccurs="1" maxOccurs="unbounded" /> - + <!-- "maxWorkQueueSize" is the maximum size this processors work queue should be before other processors are expected (not required) to stop loading new files onto it.--> <xs:element name="maxWorkQueueSize" type="xs:nonNegativeInteger"/> - + <xs:element name="maxWorkQueueDataSize" type="DataSize" minOccurs="0" maxOccurs="1" /> <!-- "flowFileExpirationMinutes" is the maximum time that a flow file may remain @@ -293,9 +293,13 @@ <!-- "queuePrioritizerClass" are Java classes that can be used to prioritize the work queues for this processor. The order of the prioritizers is important.--> <xs:element name="queuePrioritizerClass" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + + <xs:element name="loadBalanceStrategy" type="xs:string" minOccurs="0" maxOccurs="1" /> + <xs:element name="partitioningAttribute" type="xs:string" minOccurs="0" maxOccurs="1" /> + <xs:element name="loadBalanceCompression" type="xs:string" minOccurs="0" maxOccurs="1" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="PortType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -306,7 +310,7 @@ <xs:element name="scheduledState" type="ScheduledState" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="RootGroupPortType"> <xs:complexContent> <xs:extension base="PortType"> @@ -318,7 +322,7 @@ </xs:extension> </xs:complexContent> </xs:complexType> - + <xs:complexType name="RemoteGroupPortType"> <xs:complexContent> <xs:extension base="PortType"> @@ -346,12 +350,12 @@ <xs:attribute name="x" type="xs:double" use="required" /> <xs:attribute name="y" type="xs:double" use="required" /> </xs:complexType> - + <xs:complexType name="SizeType"> <xs:attribute name="width" type="xs:double" use="required" /> <xs:attribute name="height" type="xs:double" use="required" /> </xs:complexType> - + <xs:complexType name="BendPointsType"> <xs:sequence> @@ -378,13 +382,13 @@ <xs:pattern value="\d+\s*(ns|nano|nanos|nanosecond|nanoseconds|ms|milli|millis|millisecond|milliseconds|s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days|w|wk|wks|week|weeks)"></xs:pattern> </xs:restriction> </xs:simpleType> - + <xs:simpleType name="DataSize"> <xs:restriction base="xs:string"> <xs:pattern value="\d+\s*(B|KB|MB|GB|TB|b|kb|mb|gb|tb)" /> </xs:restriction> </xs:simpleType> - + <xs:simpleType name="LogLevel"> <xs:restriction base="xs:string"> <xs:enumeration value="TRACE"></xs:enumeration> @@ -396,7 +400,7 @@ <xs:enumeration value="NONE"></xs:enumeration> </xs:restriction> </xs:simpleType> - + <xs:simpleType name="SchedulingStrategy"> <xs:restriction base="xs:string"> <xs:enumeration value="EVENT_DRIVEN"></xs:enumeration> @@ -405,7 +409,7 @@ <xs:enumeration value="CRON_DRIVEN"></xs:enumeration> </xs:restriction> </xs:simpleType> - + <xs:simpleType name="ExecutionNode"> <xs:restriction base="xs:string"> <xs:enumeration value="ALL"></xs:enumeration> @@ -418,7 +422,7 @@ <xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="ControllerServiceType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -428,18 +432,18 @@ <xs:element name="class" type="NonEmptyStringType" /> <xs:element name="bundle" type="BundleType" /> <xs:element name="enabled" type="xs:boolean" /> - + <xs:element name="property" type="PropertyType" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="annotationData" type="xs:string" minOccurs="0" maxOccurs="1" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="ReportingTasksType"> <xs:sequence> <xs:element name="reportingTask" type="ReportingTaskType" minOccurs="0" maxOccurs="unbounded" /> </xs:sequence> </xs:complexType> - + <xs:complexType name="ReportingTaskType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> @@ -450,7 +454,7 @@ <xs:element name="schedulingPeriod" type="NonEmptyStringType"/> <xs:element name="scheduledState" type="ScheduledState" /> <xs:element name="schedulingStrategy" type="SchedulingStrategy" /> - + <xs:element name="property" type="PropertyType" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="annotationData" type="xs:string" minOccurs="0" maxOccurs="1" /> </xs:sequence> http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java new file mode 100644 index 0000000..541cc7b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +public class MockFlowFileRecord implements FlowFileRecord { + private static final AtomicLong idGenerator = new AtomicLong(0L); + + private final long id = idGenerator.getAndIncrement(); + private final long entryDate = System.currentTimeMillis(); + private final Map<String, String> attributes; + private final long size; + private final ContentClaim contentClaim; + + public MockFlowFileRecord() { + this(1L); + } + + public MockFlowFileRecord(final long size) { + this(new HashMap<>(), size); + } + + public MockFlowFileRecord(final Map<String, String> attributes, final long size) { + this(attributes, size, null); + } + + public MockFlowFileRecord(final Map<String, String> attributes, final long size, final ContentClaim contentClaim) { + this.attributes = attributes; + this.size = size; + this.contentClaim = contentClaim; + + if (!attributes.containsKey(CoreAttributes.UUID.key())) { + attributes.put(CoreAttributes.UUID.key(), createFakeUUID()); + } + } + + public static void resetIdGenerator() { + idGenerator.set(0L); + } + + private String createFakeUUID() { + final String s = Long.toHexString(id); + return new StringBuffer("00000000-0000-0000-0000000000000000".substring(0, (35 - s.length())) + s).insert(23, '-').toString(); + } + + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return entryDate; + } + + @Override + public long getLineageStartDate() { + return entryDate; + } + + @Override + public Long getLastQueueDate() { + return null; + } + + @Override + public boolean isPenalized() { + return false; + } + + @Override + public String getAttribute(String key) { + return attributes.get(key); + } + + @Override + public long getSize() { + return size; + } + + @Override + public Map<String, String> getAttributes() { + return Collections.unmodifiableMap(attributes); + } + + @Override + public int compareTo(final FlowFile o) { + return Long.compare(id, o.getId()); + } + + @Override + public long getPenaltyExpirationMillis() { + return 0; + } + + @Override + public ContentClaim getContentClaim() { + return contentClaim; + } + + @Override + public long getContentClaimOffset() { + return 0; + } + + @Override + public long getLineageStartIndex() { + return 0; + } + + @Override + public long getQueueDateIndex() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java new file mode 100644 index 0000000..33b71f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.IncompleteSwapFileException; +import org.apache.nifi.controller.repository.SwapContents; +import org.apache.nifi.controller.repository.SwapManagerInitializationContext; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.swap.StandardSwapContents; +import org.apache.nifi.controller.swap.StandardSwapSummary; + +public class MockSwapManager implements FlowFileSwapManager { + public final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); + public int swapOutCalledCount = 0; + public int swapInCalledCount = 0; + + public int incompleteSwapFileRecordsToInclude = -1; + + public int failSwapInAfterN = -1; + public Throwable failSwapInFailure = null; + + public void setSwapInFailure(final Throwable t) { + this.failSwapInFailure = t; + } + + @Override + public void initialize(final SwapManagerInitializationContext initializationContext) { + + } + + public void enableIncompleteSwapFileException(final int flowFilesToInclude) { + incompleteSwapFileRecordsToInclude = flowFilesToInclude; + } + + @Override + public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException { + swapOutCalledCount++; + final String location = UUID.randomUUID().toString() + "." + partitionName; + swappedOut.put(location, new ArrayList<>(flowFiles)); + return location; + } + + private void throwIncompleteIfNecessary(final String swapLocation, final boolean remove) throws IOException { + if (incompleteSwapFileRecordsToInclude > -1) { + final SwapSummary summary = getSwapSummary(swapLocation); + + final List<FlowFileRecord> records; + if (remove) { + records = swappedOut.remove(swapLocation); + } else { + records = swappedOut.get(swapLocation); + } + + final List<FlowFileRecord> partial = records.subList(0, incompleteSwapFileRecordsToInclude); + final SwapContents partialContents = new StandardSwapContents(summary, partial); + throw new IncompleteSwapFileException(swapLocation, partialContents); + } + + if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) { + if (failSwapInFailure instanceof RuntimeException) { + throw (RuntimeException) failSwapInFailure; + } + if (failSwapInFailure instanceof Error) { + throw (Error) failSwapInFailure; + } + + throw new RuntimeException(failSwapInFailure); + } + } + + @Override + public SwapContents peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { + throwIncompleteIfNecessary(swapLocation, false); + return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.get(swapLocation)); + } + + @Override + public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { + swapInCalledCount++; + throwIncompleteIfNecessary(swapLocation, true); + return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.remove(swapLocation)); + } + + @Override + public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, final String partitionName) throws IOException { + return swappedOut.keySet().stream() + .filter(key -> key.endsWith("." + partitionName)) + .collect(Collectors.toList()); + } + + @Override + public SwapSummary getSwapSummary(String swapLocation) throws IOException { + final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation); + if (flowFiles == null) { + return StandardSwapSummary.EMPTY_SUMMARY; + } + + int count = 0; + long size = 0L; + Long max = null; + final List<ResourceClaim> resourceClaims = new ArrayList<>(); + for (final FlowFileRecord flowFile : flowFiles) { + count++; + size += flowFile.getSize(); + if (max == null || flowFile.getId() > max) { + max = flowFile.getId(); + } + + if (flowFile.getContentClaim() != null) { + resourceClaims.add(flowFile.getContentClaim().getResourceClaim()); + } + } + + return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims); + } + + @Override + public void purge() { + swappedOut.clear(); + } + + @Override + public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) throws IOException { + return swappedOut.keySet().stream() + .filter(key -> key.contains(".")) + .map(key -> key.substring(key.indexOf(".") + 1)) + .collect(Collectors.toCollection(HashSet::new)); + } + + @Override + public String changePartitionName(final String swapLocation, final String newPartitionName) throws IOException { + final List<FlowFileRecord> flowFiles = swappedOut.remove(swapLocation); + if (flowFiles == null) { + throw new IOException("Could not find swapfile with name " + swapLocation); + } + + final String newSwapLocation; + final int dotIndex = swapLocation.indexOf("."); + if (dotIndex < 0) { + newSwapLocation = swapLocation + "." + newPartitionName; + } else { + newSwapLocation = swapLocation.substring(0, dotIndex) + "." + newPartitionName; + } + + swappedOut.put(newSwapLocation, flowFiles); + return newSwapLocation; + } +}
