http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 0000000,8c60e4b..acb3a01 mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@@ -1,0 -1,173 +1,194 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.scheduling; + ++import java.util.Collection; ++import java.util.Collections; + import java.util.HashMap; ++import java.util.HashSet; + import java.util.Map; ++import java.util.Set; + import java.util.concurrent.TimeUnit; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.connectable.Connectable; ++import org.apache.nifi.connectable.Connection; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.ControllerServiceLookup; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.expression.AttributeValueDecorator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; ++import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.exception.ProcessException; + + /** + * This class is essentially an empty shell for {@link Connectable}s that are + * not Processors + */ + public class ConnectableProcessContext implements ProcessContext { + + private final Connectable connectable; + private final StringEncryptor encryptor; + + public ConnectableProcessContext(final Connectable connectable, final StringEncryptor encryptor) { + this.connectable = connectable; + this.encryptor = encryptor; + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor descriptor) { + return getProperty(descriptor.getName()); + } + + @Override + public PropertyValue getProperty(final String propertyName) { + return new PropertyValue() { + @Override + public String getValue() { + return null; + } + + @Override + public Integer asInteger() { + return null; + } + + @Override + public Long asLong() { + return null; + } + + @Override + public Boolean asBoolean() { + return null; + } + + @Override + public Float asFloat() { + return null; + } + + @Override + public Double asDouble() { + return null; + } + + @Override + public Long asTimePeriod(final TimeUnit timeUnit) { + return null; + } + + @Override + public Double asDataSize(final DataUnit dataUnit) { + return null; + } + + @Override + public PropertyValue evaluateAttributeExpressions() throws ProcessException { + return this; + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { + return this; + } + + @Override + public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { + return this; + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + return this; + } + + @Override + public ControllerService asControllerService() { + return null; + } + + @Override + public <T extends ControllerService> T asControllerService(Class<T> serviceType) throws IllegalArgumentException { + return null; + } + + @Override + public boolean isSet() { + return false; + } + }; + } + + @Override + public PropertyValue newPropertyValue(String rawValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void yield() { + connectable.yield(); + } + + @Override + public int getMaxConcurrentTasks() { + return connectable.getMaxConcurrentTasks(); + } + + @Override + public String getAnnotationData() { + return null; + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return new HashMap<>(); + } + + @Override + public String decrypt(String encrypted) { + return encryptor.decrypt(encrypted); + } + + @Override + public String encrypt(String unencrypted) { + return encryptor.encrypt(unencrypted); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return null; + } ++ ++ @Override ++ public Set<Relationship> getAvailableRelationships() { ++ for ( final Connection connection : connectable.getConnections() ) { ++ if ( connection.getFlowFileQueue().isFull() ) { ++ return Collections.emptySet(); ++ } ++ } ++ ++ final Collection<Relationship> relationships = connectable.getRelationships(); ++ if ( relationships instanceof Set ) { ++ return (Set<Relationship>) relationships; ++ } ++ return new HashSet<>(connectable.getRelationships()); ++ } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 0000000,93a8c6b..cd0d31c mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@@ -1,0 -1,145 +1,173 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processor; + ++import java.util.Collection; + import java.util.HashMap; ++import java.util.HashSet; + import java.util.Map; + import java.util.Set; + + import org.apache.nifi.attribute.expression.language.PreparedQuery; + import org.apache.nifi.attribute.expression.language.Query; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; ++import org.apache.nifi.connectable.Connection; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.ControllerServiceLookup; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.service.ControllerServiceProvider; + import org.apache.nifi.encrypt.StringEncryptor; + + public class StandardProcessContext implements ProcessContext, ControllerServiceLookup { + + private final ProcessorNode procNode; + private final ControllerServiceProvider controllerServiceProvider; + private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; + private final StringEncryptor encryptor; + + public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) { + this.procNode = processorNode; + this.controllerServiceProvider = controllerServiceProvider; + this.encryptor = encryptor; + + preparedQueries = new HashMap<>(); + for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) { + final PropertyDescriptor desc = entry.getKey(); + String value = entry.getValue(); + if (value == null) { + value = desc.getDefaultValue(); + } + + final PreparedQuery pq = Query.prepare(value); + preparedQueries.put(desc, pq); + } + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor descriptor) { + return getProperty(descriptor.getName()); + } + + /** + * <p> + * Returns the currently configured value for the property with the given + * name. + * </p> + */ + @Override + public PropertyValue getProperty(final String propertyName) { + final Processor processor = procNode.getProcessor(); + final PropertyDescriptor descriptor = processor.getPropertyDescriptor(propertyName); + if (descriptor == null) { + return null; + } + + final String setPropertyValue = procNode.getProperty(descriptor); + final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue; + + return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor)); + } + + @Override + public PropertyValue newPropertyValue(final String rawValue) { + return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue)); + } + + @Override + public void yield() { + procNode.yield(); + } + + @Override + public ControllerService getControllerService(final String serviceIdentifier) { + return controllerServiceProvider.getControllerService(serviceIdentifier); + } + + @Override + public int getMaxConcurrentTasks() { + return procNode.getMaxConcurrentTasks(); + } + + @Override + public String getAnnotationData() { + return procNode.getAnnotationData(); + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return procNode.getProperties(); + } + + @Override + public String encrypt(final String unencrypted) { + return encryptor.encrypt(unencrypted); + } + + @Override + public String decrypt(final String encrypted) { + return encryptor.decrypt(encrypted); + } + + @Override + public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { + if (!serviceType.isInterface()) { + throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface"); + } + return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); + } + + @Override + public boolean isControllerServiceEnabled(final ControllerService service) { + return controllerServiceProvider.isControllerServiceEnabled(service); + } + + @Override + public boolean isControllerServiceEnabled(final String serviceIdentifier) { + return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return this; + } ++ ++ @Override ++ public Set<Relationship> getAvailableRelationships() { ++ final Set<Relationship> set = new HashSet<>(); ++ for (final Relationship relationship : procNode.getRelationships()) { ++ final Collection<Connection> connections = procNode.getConnections(relationship); ++ if (connections.isEmpty()) { ++ set.add(relationship); ++ } else { ++ boolean available = true; ++ for (final Connection connection : connections) { ++ if (connection.getFlowFileQueue().isFull()) { ++ available = false; ++ } ++ } ++ ++ if (available) { ++ set.add(relationship); ++ } ++ } ++ } ++ ++ return set; ++ } ++ + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 0000000,0fe08c9..318901f mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@@ -1,0 -1,107 +1,113 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processor; + + import java.util.Map; ++import java.util.Set; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.controller.ControllerServiceLookup; + import org.apache.nifi.controller.ProcessorNode; + import org.apache.nifi.controller.service.ControllerServiceNode; + import org.apache.nifi.controller.service.ControllerServiceProvider; + + public class StandardSchedulingContext implements SchedulingContext { + + private final ProcessContext processContext; + private final ControllerServiceProvider serviceProvider; + private final ProcessorNode processorNode; + + public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) { + this.processContext = processContext; + this.serviceProvider = serviceProvider; + this.processorNode = processorNode; + } + + @Override + public void leaseControllerService(final String identifier) { + final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier); + if (serviceNode == null) { + throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier); + } + + if (serviceNode.isDisabled()) { + throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled"); + } + + if (!serviceNode.isValid()) { + throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid"); + } + + serviceNode.addReference(processorNode); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor descriptor) { + return processContext.getProperty(descriptor); + } + + @Override + public PropertyValue getProperty(final String propertyName) { + return processContext.getProperty(propertyName); + } + + @Override + public PropertyValue newPropertyValue(final String rawValue) { + return processContext.newPropertyValue(rawValue); + } + + @Override + public void yield() { + processContext.yield(); + } + + @Override + public int getMaxConcurrentTasks() { + return processContext.getMaxConcurrentTasks(); + } + + @Override + public String getAnnotationData() { + return processContext.getAnnotationData(); + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return processContext.getProperties(); + } + + @Override + public String encrypt(final String unencrypted) { + return processContext.encrypt(unencrypted); + } + + @Override + public String decrypt(final String encrypted) { + return processContext.decrypt(encrypted); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return processContext.getControllerServiceLookup(); + } ++ ++ @Override ++ public Set<Relationship> getAvailableRelationships() { ++ return processContext.getAvailableRelationships(); ++ } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 0000000,22ec983..d4b4f61 mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@@ -1,0 -1,510 +1,510 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.remote.protocol.socket; + + import java.io.DataInputStream; + import java.io.DataOutputStream; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.TimeUnit; + import java.util.zip.CRC32; + import java.util.zip.CheckedInputStream; + import java.util.zip.CheckedOutputStream; + + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.remote.Peer; + import org.apache.nifi.remote.PeerStatus; + import org.apache.nifi.remote.RemoteGroupPort; + import org.apache.nifi.remote.RemoteResourceFactory; + import org.apache.nifi.remote.StandardVersionNegotiator; + import org.apache.nifi.remote.VersionNegotiator; + import org.apache.nifi.remote.codec.FlowFileCodec; + import org.apache.nifi.remote.codec.StandardFlowFileCodec; + import org.apache.nifi.remote.exception.HandshakeException; + import org.apache.nifi.remote.exception.ProtocolException; + import org.apache.nifi.remote.io.CompressionInputStream; + import org.apache.nifi.remote.io.CompressionOutputStream; + import org.apache.nifi.remote.protocol.ClientProtocol; + import org.apache.nifi.remote.protocol.CommunicationsSession; + import org.apache.nifi.remote.protocol.RequestType; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.StopWatch; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class SocketClientProtocol implements ClientProtocol { + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); + + + private RemoteGroupPort port; + private boolean useCompression; + + private String commsIdentifier; + private boolean handshakeComplete = false; + + private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class); + + private Response handshakeResponse = null; + private boolean readyForFileTransfer = false; + private String transitUriPrefix = null; + + private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds + + public SocketClientProtocol() { + } + + public void setPort(final RemoteGroupPort port) { + this.port = port; + this.useCompression = port.isUseCompression(); + } + + @Override + public void handshake(final Peer peer) throws IOException, HandshakeException { + if ( handshakeComplete ) { + throw new IllegalStateException("Handshake has already been completed"); + } + commsIdentifier = UUID.randomUUID().toString(); + logger.debug("{} handshaking with {}", this, peer); + + final Map<HandshakeProperty, String> properties = new HashMap<>(); + properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); + properties.put(HandshakeProperty.PORT_IDENTIFIER, port.getIdentifier()); + properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf( + port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)) ); + + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + commsSession.setTimeout(port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + dos.writeUTF(commsIdentifier); + + if ( versionNegotiator.getVersion() >= 3 ) { + dos.writeUTF(peer.getUrl()); + transitUriPrefix = peer.getUrl(); + + if ( !transitUriPrefix.endsWith("/") ) { + transitUriPrefix = transitUriPrefix + "/"; + } + } + + dos.writeInt(properties.size()); + for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { + dos.writeUTF(entry.getKey().name()); + dos.writeUTF(entry.getValue()); + } + + dos.flush(); + + try { + handshakeResponse = Response.read(dis); + } catch (final ProtocolException e) { + throw new HandshakeException(e); + } + + switch (handshakeResponse.getCode()) { + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + break; + case PROPERTIES_OK: + readyForFileTransfer = true; + break; + default: + logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] { + this, handshakeResponse, peer}); + peer.close(); + throw new HandshakeException("Received unexpected response " + handshakeResponse); + } + + logger.debug("{} Finished handshake with {}", this, peer); + handshakeComplete = true; + } + + public boolean isReadyForFileTransfer() { + return readyForFileTransfer; + } + + public boolean isPortInvalid() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE; + } + + public boolean isPortUnknown() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT; + } + + public boolean isDestinationFull() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL; + } + + @Override + public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + + logger.debug("{} Get Peer Statuses from {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + RequestType.REQUEST_PEER_LIST.writeRequestType(dos); + dos.flush(); + final int numPeers = dis.readInt(); + final Set<PeerStatus> peers = new HashSet<>(numPeers); + for (int i=0; i < numPeers; i++) { + final String hostname = dis.readUTF(); + final int port = dis.readInt(); + final boolean secure = dis.readBoolean(); + final int flowFileCount = dis.readInt(); + peers.add(new PeerStatus(hostname, port, secure, flowFileCount)); + } + + logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); + return peers; + } + + @Override + public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + + logger.debug("{} Negotiating Codec with {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos); + + FlowFileCodec codec = new StandardFlowFileCodec(); + try { + codec = (FlowFileCodec) RemoteResourceFactory.initiateResourceNegotiation(codec, dis, dos); + } catch (HandshakeException e) { + throw new ProtocolException(e.toString()); + } + logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession}); + + return codec; + } + + + @Override + public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + if ( !readyForFileTransfer ) { + throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse); + } + + logger.debug("{} Receiving FlowFiles from {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String userDn = commsSession.getUserDn(); + if ( userDn == null ) { + userDn = "none"; + } + + // Indicate that we would like to have some data + RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); + dos.flush(); + + // Determine if Peer will send us data or has no data to send us + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + return; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + + final StopWatch stopWatch = new StopWatch(true); + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + final CRC32 crc = new CRC32(); + + // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data. + boolean continueTransaction = true; + String calculatedCRC = ""; + while (continueTransaction) { + final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis; + final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc); + + final long startNanos = System.nanoTime(); + FlowFile flowFile = codec.decode(checkedIn, session); + final long transmissionNanos = System.nanoTime() - startNanos; + final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS); + + final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key()); + flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis); + + session.transfer(flowFile, Relationship.ANONYMOUS); + bytesReceived += flowFile.getSize(); + flowFilesReceived.add(flowFile); + logger.debug("{} Received {} from {}", this, flowFile, peer); + + final Response transactionCode = Response.read(dis); + switch (transactionCode.getCode()) { + case CONTINUE_TRANSACTION: + logger.trace("{} Received ContinueTransaction indicator from {}", this, peer); + break; + case FINISH_TRANSACTION: + logger.trace("{} Received FinishTransaction indicator from {}", this, peer); + continueTransaction = false; + calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue()); + break; + default: + throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode); + } + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); + + final Response confirmTransactionResponse = Response.read(dis); + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + session.rollback(); + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + // Commit the session so that we have persisted the data + session.commit(); + - if ( session.getAvailableRelationships().isEmpty() ) { ++ if ( context.getAvailableRelationships().isEmpty() ) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); + } + + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } + + @Override + public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + if ( !readyForFileTransfer ) { + throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse); + } + + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + logger.debug("{} Sending FlowFiles to {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String userDn = commsSession.getUserDn(); + if ( userDn == null ) { + userDn = "none"; + } + + // Indicate that we would like to have some data + RequestType.SEND_FLOWFILES.writeRequestType(dos); + dos.flush(); + + final StopWatch stopWatch = new StopWatch(true); + final CRC32 crc = new CRC32(); + + long bytesSent = 0L; + final Set<FlowFile> flowFilesSent = new HashSet<>(); + boolean continueTransaction = true; + String calculatedCRC = ""; + final long startSendingNanos = System.nanoTime(); + while (continueTransaction) { + final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos; + logger.debug("{} Sending {} to {}", this, flowFile, peer); + + final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc); + + final long startNanos = System.nanoTime(); + flowFile = codec.encode(flowFile, session, checkedOutStream); + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if ( useCompression ) { + checkedOutStream.close(); + } + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + logger.debug("{} Sent {} to {}", this, flowFile, peer); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); + session.remove(flowFile); + + final long sendingNanos = System.nanoTime() - startSendingNanos; + if ( sendingNanos < BATCH_SEND_NANOS ) { + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + if ( continueTransaction ) { + logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer); + ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); + } else { + logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); + ResponseCode.FINISH_TRANSACTION.writeResponse(dos); + + calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() ); + } + } + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = Response.read(dis); + if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + // Confirm checksum and echo back the confirmation. + logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + if ( versionNegotiator.getVersion() > 3 ) { + if ( !receivedCRC.equals(calculatedCRC) ) { + ResponseCode.BAD_CHECKSUM.writeResponse(dos); + session.rollback(); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + + final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + + final Response transactionResponse; + try { + transactionResponse = Response.read(dis); + } catch (final IOException e) { + logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + + " It is unknown whether or not the peer successfully received/processed the data." + + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", + this, peer, session, flowFileDescription); + session.rollback(); + throw e; + } + + logger.debug("{} Received {} from {}", this, transactionResponse, peer); + if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS)); + } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + + session.commit(); + + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public void shutdown(final Peer peer) throws IOException { + readyForFileTransfer = false; + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + logger.debug("{} Shutting down with {}", this, peer); + // Indicate that we would like to have some data + RequestType.SHUTDOWN.writeRequestType(dos); + dos.flush(); + } + + @Override + public String getResourceName() { + return "SocketFlowFileProtocol"; + } + + @Override + public String toString() { + return "SocketClientProtocol[CommsID=" + commsIdentifier + "]"; + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 0000000,88b6a41..5edd4f9 mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@@ -1,0 -1,581 +1,581 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.remote.protocol.socket; + + import java.io.DataInputStream; + import java.io.DataOutputStream; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.net.InetAddress; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.Set; + import java.util.UUID; + import java.util.concurrent.TimeUnit; + import java.util.zip.CRC32; + import java.util.zip.CheckedInputStream; + import java.util.zip.CheckedOutputStream; + + import org.apache.nifi.cluster.NodeInformant; + import org.apache.nifi.connectable.Connection; + import org.apache.nifi.connectable.Port; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.groups.ProcessGroup; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.remote.Peer; + import org.apache.nifi.remote.PortAuthorizationResult; + import org.apache.nifi.remote.RemoteResourceFactory; + import org.apache.nifi.remote.RootGroupPort; + import org.apache.nifi.remote.StandardVersionNegotiator; + import org.apache.nifi.remote.VersionNegotiator; + import org.apache.nifi.remote.codec.FlowFileCodec; + import org.apache.nifi.remote.exception.HandshakeException; + import org.apache.nifi.remote.exception.ProtocolException; + import org.apache.nifi.remote.io.CompressionInputStream; + import org.apache.nifi.remote.io.CompressionOutputStream; + import org.apache.nifi.remote.protocol.CommunicationsSession; + import org.apache.nifi.remote.protocol.RequestType; + import org.apache.nifi.remote.protocol.ServerProtocol; + import org.apache.nifi.util.FormatUtils; + import org.apache.nifi.util.NiFiProperties; + import org.apache.nifi.util.StopWatch; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class SocketFlowFileServerProtocol implements ServerProtocol { + public static final String RESOURCE_NAME = "SocketFlowFileProtocol"; + + private ProcessGroup rootGroup; + private String commsIdentifier; + private boolean handshakeCompleted; + + private Boolean useGzip; + private long requestExpirationMillis; + private RootGroupPort port; + private boolean shutdown = false; + private FlowFileCodec negotiatedFlowFileCodec = null; + private String transitUriPrefix = null; + + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); + private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class); + + private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds + + + @Override + public void setRootProcessGroup(final ProcessGroup group) { + if ( !group.isRootGroup() ) { + throw new IllegalArgumentException(); + } + this.rootGroup = group; + } + + @Override + public void handshake(final Peer peer) throws IOException, HandshakeException { + if ( handshakeCompleted ) { + throw new IllegalStateException("Handshake has already been completed"); + } + if ( shutdown ) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Handshaking with {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + commsIdentifier = dis.readUTF(); + + if ( versionNegotiator.getVersion() >= 3 ) { + transitUriPrefix = dis.readUTF(); + if ( !transitUriPrefix.endsWith("/") ) { + transitUriPrefix = transitUriPrefix + "/"; + } + } + + final Map<String, String> properties = new HashMap<>(); + final int numProperties = dis.readInt(); + for (int i=0; i < numProperties; i++) { + final String propertyName = dis.readUTF(); + final String propertyValue = dis.readUTF(); + properties.put(propertyName, propertyValue); + } + + // evaluate the properties received + boolean responseWritten = false; + for ( final Map.Entry<String, String> entry : properties.entrySet() ) { + final String propertyName = entry.getKey(); + final String value = entry.getValue(); + + final HandshakeProperty property; + try { + property = HandshakeProperty.valueOf(propertyName); + } catch (final Exception e) { + ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName); + throw new HandshakeException("Received unknown property: " + propertyName); + } + + switch (property) { + case GZIP: { + useGzip = Boolean.parseBoolean(value); + break; + } + case REQUEST_EXPIRATION_MILLIS: + requestExpirationMillis = Long.parseLong(value); + break; + case PORT_IDENTIFIER: { + Port receivedPort = rootGroup.getInputPort(value); + if ( receivedPort == null ) { + receivedPort = rootGroup.getOutputPort(value); + } + if ( receivedPort == null ) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); + ResponseCode.UNKNOWN_PORT.writeResponse(dos); + throw new HandshakeException("Received unknown port identifier: " + value); + } + if ( !(receivedPort instanceof RootGroupPort) ) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); + ResponseCode.UNKNOWN_PORT.writeResponse(dos); + throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); + } + + this.port = (RootGroupPort) receivedPort; + final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); + if ( !portAuthResult.isAuthorized() ) { + logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); + ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); + responseWritten = true; + break; + } + + if ( !receivedPort.isValid() ) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); + responseWritten = true; + break; + } + + if ( !receivedPort.isRunning() ) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); + responseWritten = true; + break; + } + + // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this + // we we will simply not service the request but the sender will timeout + if ( getVersionNegotiator().getVersion() > 1 ) { + for ( final Connection connection : port.getConnections() ) { + if ( connection.getFlowFileQueue().isFull() ) { + logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); + ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); + responseWritten = true; + break; + } + } + } + + break; + } + } + } + + if ( useGzip == null ) { + logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing"); + ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name()); + throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name()); + } + if ( port == null ) { + logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing"); + ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name()); + throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name()); + } + + // send "OK" response + if ( !responseWritten ) { + ResponseCode.PROPERTIES_OK.writeResponse(dos); + } + + logger.debug("{} Finished handshake with {}", this, peer); + handshakeCompleted = true; + } + + @Override + public boolean isHandshakeSuccessful() { + return handshakeCompleted; + } + + @Override + public RootGroupPort getPort() { + return port; + } + + @Override + public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { + if ( !handshakeCompleted ) { + throw new IllegalStateException("Handshake has not been completed"); + } + if ( shutdown ) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Negotiating Codec with {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()}); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + // Negotiate the FlowFileCodec to use. + try { + negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos); + logger.debug("{} Negotiated Codec {} with {}", new Object[] {this, negotiatedFlowFileCodec, peer}); + return negotiatedFlowFileCodec; + } catch (final HandshakeException e) { + throw new ProtocolException(e.toString()); + } + } + + @Override + public FlowFileCodec getPreNegotiatedCodec() { + return negotiatedFlowFileCodec; + } + + + @Override + public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( !handshakeCompleted ) { + throw new IllegalStateException("Handshake has not been completed"); + } + if ( shutdown ) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Sending FlowFiles to {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String remoteDn = commsSession.getUserDn(); + if ( remoteDn == null ) { + remoteDn = "none"; + } + + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + // we have no data to send. Notify the peer. + logger.debug("{} No data to send to {}", this, peer); + ResponseCode.NO_MORE_DATA.writeResponse(dos); + return 0; + } + + // we have data to send. + logger.debug("{} Data is available to send to {}", this, peer); + ResponseCode.MORE_DATA.writeResponse(dos); + + final StopWatch stopWatch = new StopWatch(true); + long bytesSent = 0L; + final Set<FlowFile> flowFilesSent = new HashSet<>(); + final CRC32 crc = new CRC32(); + + // send data until we reach some batch size + boolean continueTransaction = true; + final long startNanos = System.nanoTime(); + String calculatedCRC = ""; + while (continueTransaction) { + final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos; + logger.debug("{} Sending {} to {}", new Object[] {this, flowFile, peer}); + + final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc); + + final StopWatch transferWatch = new StopWatch(true); + flowFile = codec.encode(flowFile, session, checkedOutputStream); + final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if ( useGzip ) { + checkedOutputStream.close(); + } + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); + session.remove(flowFile); + + final long sendingNanos = System.nanoTime() - startNanos; + if ( sendingNanos < BATCH_NANOS ) { + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + if ( continueTransaction ) { + logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer); + ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); + } else { + logger.debug("{} Sending FinishTransaction indicator to {}", this, peer); + ResponseCode.FINISH_TRANSACTION.writeResponse(dos); + calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue()); + } + } + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = Response.read(dis); + if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + // Confirm Checksum and echo back the confirmation. + logger.debug("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + if ( versionNegotiator.getVersion() > 3 ) { + if ( !receivedCRC.equals(calculatedCRC) ) { + ResponseCode.BAD_CHECKSUM.writeResponse(dos); + session.rollback(); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + + final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + + final Response transactionResponse; + try { + transactionResponse = Response.read(dis); + } catch (final IOException e) { + logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + + " It is unknown whether or not the peer successfully received/processed the data." + + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", + this, peer, session, flowFileDescription); + session.rollback(); + throw e; + } + + logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer}); + if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS)); + } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + + session.commit(); + + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + + return flowFilesSent.size(); + } + + + @Override + public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( !handshakeCompleted ) { + throw new IllegalStateException("Handshake has not been completed"); + } + if ( shutdown ) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} receiving FlowFiles from {}", this, peer); + + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String remoteDn = commsSession.getUserDn(); + if ( remoteDn == null ) { + remoteDn = "none"; + } + + final StopWatch stopWatch = new StopWatch(true); + final CRC32 crc = new CRC32(); + + // Peer has data. Otherwise, we would not have been called, because they would not have sent + // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's + // finished sending data. + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + boolean continueTransaction = true; + String calculatedCRC = ""; + while (continueTransaction) { + final long startNanos = System.nanoTime(); + final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis; + final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc); + + FlowFile flowFile = codec.decode(checkedInputStream, session); + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid; + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis); + session.transfer(flowFile, Relationship.ANONYMOUS); + flowFilesReceived.add(flowFile); + bytesReceived += flowFile.getSize(); + + final Response transactionResponse = Response.read(dis); + switch (transactionResponse.getCode()) { + case CONTINUE_TRANSACTION: + logger.debug("{} Received ContinueTransaction indicator from {}", this, peer); + break; + case FINISH_TRANSACTION: + logger.debug("{} Received FinishTransaction indicator from {}", this, peer); + continueTransaction = false; + calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue()); + break; + default: + throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse); + } + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); + + final Response confirmTransactionResponse = Response.read(dis); + logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + session.rollback(); + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + // Commit the session so that we have persisted the data + session.commit(); + - if ( session.getAvailableRelationships().isEmpty() ) { ++ if ( context.getAvailableRelationships().isEmpty() ) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); + } + + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + + return flowFilesReceived.size(); + } + + @Override + public RequestType getRequestType(final Peer peer) throws IOException { + if ( !handshakeCompleted ) { + throw new IllegalStateException("Handshake has not been completed"); + } + if ( shutdown ) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Reading Request Type from {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()}); + final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream())); + logger.debug("{} Got Request Type {} from {}", new Object[] {this, requestType, peer}); + + return requestType; + } + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public void shutdown(final Peer peer) { + logger.debug("{} Shutting down with {}", this, peer); + shutdown = true; + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public void sendPeerList(final Peer peer) throws IOException { + if ( !handshakeCompleted ) { + throw new IllegalStateException("Handshake has not been completed"); + } + if ( shutdown ) { + throw new IllegalStateException("Protocol is shutdown"); + } + + logger.debug("{} Sending Peer List to {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + final NiFiProperties properties = NiFiProperties.getInstance(); + + // we have only 1 peer: ourselves. + dos.writeInt(1); + dos.writeUTF(InetAddress.getLocalHost().getHostName()); + dos.writeInt(properties.getRemoteInputPort()); + dos.writeBoolean(properties.isSiteToSiteSecure()); + dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host. + dos.flush(); + } + + @Override + public String getResourceName() { + return RESOURCE_NAME; + } + + @Override + public void setNodeInformant(final NodeInformant nodeInformant) { + } + + @Override + public long getRequestExpiration() { + return requestExpirationMillis; + } + + @Override + public String toString() { + return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]"; + } + }
