http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
index 96f252b..0cde5cb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
@@ -16,28 +16,86 @@
  */
 package org.apache.nifi.web.api;
 
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.exception.RequestExpiredException;
+import org.apache.nifi.remote.io.http.HttpOutput;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl;
+import org.apache.nifi.remote.protocol.http.HttpHeaders;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.api.entity.PeersEntity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
 
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.entity.ControllerEntity;
-
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static 
org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_SIZE;
+import static 
org.apache.nifi.remote.protocol.HandshakeProperty.REQUEST_EXPIRATION_MILLIS;
 
 /**
- * RESTful endpoint for managing a Flow Controller.
+ * RESTful endpoint for managing a SiteToSite connection.
  */
 @Path("/site-to-site")
 @Api(
@@ -46,7 +104,19 @@ import com.wordnik.swagger.annotations.Authorization;
 )
 public class SiteToSiteResource extends ApplicationResource {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(SiteToSiteResource.class);
+
+    public static final String CHECK_SUM = "checksum";
+    public static final String RESPONSE_CODE = "responseCode";
+
+
+    private static final String PORT_TYPE_INPUT = "input-ports";
+    private static final String PORT_TYPE_OUTPUT = "output-ports";
+
     private NiFiServiceFacade serviceFacade;
+    private final ResponseCreator responseCreator = new ResponseCreator();
+    private final VersionNegotiator transportProtocolVersionNegotiator = new 
TransportProtocolVersionNegotiator(1);
+    private final HttpRemoteSiteListener transactionManager = 
HttpRemoteSiteListener.getInstance();
 
     @Context
     private ResourceContext resourceContext;
@@ -73,7 +143,8 @@ public class SiteToSiteResource extends ApplicationResource {
                 @ApiResponse(code = 409, message = "The request was valid but 
NiFi was not in the appropriate state to process it. Retrying the same request 
later may be successful.")
             }
     )
-    public Response getController() {
+    public Response getController(
+            @Context HttpServletRequest req) {
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -86,12 +157,954 @@ public class SiteToSiteResource extends 
ApplicationResource {
         final ControllerEntity entity = new ControllerEntity();
         entity.setController(controller);
 
+        if (isEmpty(req.getHeader(HttpHeaders.PROTOCOL_VERSION))) {
+            // This indicates the client uses older NiFi version,
+            // which strictly read JSON properties and fail with unknown 
properties.
+            // Convert result entity so that old version clients can 
understance.
+            logger.debug("Converting result to provide backward 
compatibility...");
+            controller.setRemoteSiteHttpListeningPort(null);
+        }
+
         // generate the response
         return clusterContext(noCache(Response.ok(entity))).build();
     }
 
+    private Response.ResponseBuilder setCommonHeaders(Response.ResponseBuilder 
builder, Integer transportProtocolVersion) {
+        return builder.header(HttpHeaders.PROTOCOL_VERSION, 
transportProtocolVersion)
+                .header(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, 
transactionManager.getTransactionTtlSec());
+    }
+
+    private Integer negotiateTransportProtocolVersion(@Context 
HttpServletRequest req) throws BadRequestException {
+        String protocolVersionStr = 
req.getHeader(HttpHeaders.PROTOCOL_VERSION);
+        if (isEmpty(protocolVersionStr)) {
+            throw new BadRequestException("Protocol version was not 
specified.");
+        }
+
+        final Integer requestedProtocolVersion;
+        try {
+            requestedProtocolVersion = Integer.valueOf(protocolVersionStr);
+        } catch (NumberFormatException e) {
+            throw new BadRequestException("Specified protocol version was not 
in a valid number format: " + protocolVersionStr);
+        }
+
+        Integer protocolVersion;
+        if 
(transportProtocolVersionNegotiator.isVersionSupported(requestedProtocolVersion))
 {
+            return requestedProtocolVersion;
+        } else {
+            protocolVersion = 
transportProtocolVersionNegotiator.getPreferredVersion(requestedProtocolVersion);
+        }
+
+        if (protocolVersion == null) {
+            throw new BadRequestException("Specified protocol version is not 
supported: " + protocolVersionStr);
+        }
+        return protocolVersion;
+    }
+
+
+    /**
+     * Returns the available Peers and its status of this NiFi.
+     *
+     * @param clientId Optional client id. If the client id is not specified, 
a new one will be generated. This value (whether specified or generated) is 
included in the response.
+     * @return A peersEntity.
+     */
+    @GET
+    @Path("/peers")
+    @Consumes(MediaType.WILDCARD)
+    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+    // TODO: @PreAuthorize("hasRole('ROLE_NIFI')")
+    @ApiOperation(
+            value = "Returns the available Peers and its status of this NiFi",
+            response = PeersEntity.class,
+            authorizations = @Authorization(value = "NiFi", type = "ROLE_NIFI")
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response getPeers(
+            @ApiParam(
+            value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
+            required = false
+    )
+    @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter 
clientId,
+    @Context HttpServletRequest req) {
+
+        if (!properties.isSiteToSiteHttpEnabled()) {
+            return responseCreator.httpSiteToSiteIsNotEnabledResponse();
+        }
+
+        final Integer transportProtocolVersion;
+        try {
+            transportProtocolVersion = negotiateTransportProtocolVersion(req);
+        } catch (BadRequestException e) {
+            return responseCreator.badRequestResponse(e);
+        }
+
+        ArrayList<PeerDTO> peers;
+
+        if (properties.isNode()) {
+            return responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " 
is only accessible on NCM or Standalone NiFi instance.");
+        // TODO: NCM no longer exists.
+        /*
+        } else if (properties.isClusterManager()) {
+            ClusterNodeInformation clusterNodeInfo = 
clusterManager.getNodeInformation();
+            final Collection<NodeInformation> nodeInfos = 
clusterNodeInfo.getNodeInformation();
+            peers = new ArrayList<>(nodeInfos.size());
+            for (NodeInformation nodeInfo : nodeInfos) {
+                if (nodeInfo.getSiteToSiteHttpApiPort() == null) {
+                    continue;
+                }
+                PeerDTO peer = new PeerDTO();
+                peer.setHostname(nodeInfo.getSiteToSiteHostname());
+                peer.setPort(nodeInfo.getSiteToSiteHttpApiPort());
+                peer.setSecure(nodeInfo.isSiteToSiteSecure());
+                peer.setFlowFileCount(nodeInfo.getTotalFlowFiles());
+                peers.add(peer);
+            }
+        */
+        } else {
+            // Standalone mode.
+            PeerDTO peer = new PeerDTO();
+            // req.getLocalName returns private IP address, that can't be 
accessed from client in some environments.
+            // So, use the value defined in nifi.properties instead when it is 
defined.
+            String remoteInputHost = properties.getRemoteInputHost();
+            peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : 
remoteInputHost);
+            peer.setPort(properties.getRemoteInputHttpPort());
+            peer.setSecure(properties.isSiteToSiteSecure());
+            peer.setFlowFileCount(0);  // doesn't matter how many FlowFiles we 
have, because we're the only host.
+
+            peers = new ArrayList<>(1);
+            peers.add(peer);
+
+        }
+
+        PeersEntity entity = new PeersEntity();
+        entity.setPeers(peers);
+
+        return clusterContext(noCache(setCommonHeaders(Response.ok(entity), 
transportProtocolVersion))).build();
+    }
+
+    @POST
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{portType}/{portId}/transactions")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Create a transaction to the specified output port or 
input port",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not 
ready for serving request, or temporarily overloaded. Retrying the same request 
later may be successful"),
+            }
+    )
+    public Response createPortTransaction(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portType") String portType,
+            @PathParam("portId") String portId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            @Context UriInfo uriInfo,
+            InputStream inputStream) {
+
+        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        if(!PORT_TYPE_INPUT.equals(portType) && 
!PORT_TYPE_OUTPUT.equals(portType)){
+            return responseCreator.wrongPortTypeResponse(clientId, portType, 
portId);
+        }
+
+        logger.debug("createPortTransaction request: clientId={}, portType={}, 
portId={}", clientId.getClientId(), portType, portId);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final String transactionId = transactionManager.createTransaction();
+        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
+        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
+
+        try {
+            // Execute handshake.
+            initiateServerProtocol(peer, transportProtocolVersion);
+
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
+            entity.setMessage("Handshake properties are valid, and port is 
running. A transaction is created:" + transactionId);
+
+            return responseCreator.locationResponse(uriInfo, portType, portId, 
transactionId, entity, transportProtocolVersion);
+
+        } catch (HandshakeException e) {
+            transactionManager.cancelTransaction(transactionId);
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            transactionManager.cancelTransaction(transactionId);
+            return responseCreator.unexpectedErrorResponse(clientId, portId, 
e);
+        }
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.TEXT_PLAIN)
+    @Path("input-ports/{portId}/transactions/{transactionId}/flow-files")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Transfer flow files to the input port",
+            response = String.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not 
ready for serving request, or temporarily overloaded. Retrying the same request 
later may be successful"),
+            }
+    )
+    public Response receiveFlowFiles(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("receiveFlowFiles request: clientId={}, portId={}", 
clientId.getClientId(), portId);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
+        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
+
+        try {
+            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+            int numOfFlowFiles = 
serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol);
+            logger.debug("finished receiving flow files, numOfFlowFiles={}", 
numOfFlowFiles);
+            if (numOfFlowFiles < 1) {
+                return Response.status(Response.Status.BAD_REQUEST)
+                        .entity("Client should send request when there is data 
to send. There was no flow file sent.").build();
+            }
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (NotAuthorizedException e) {
+            return responseCreator.unauthorizedResponse(e);
+
+        } catch (BadRequestException | RequestExpiredException e) {
+            return responseCreator.badRequestResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(clientId, portId, 
e);
+        }
+
+        String serverChecksum = 
((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getChecksum();
+        return responseCreator.acceptedResponse(serverChecksum, 
transportProtocolVersion);
+    }
+
+    private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, 
Integer transportProtocolVersion) throws IOException {
+        // Switch transaction protocol version based on transport protocol 
version.
+        TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion 
= new TransportProtocolVersionNegotiator(transportProtocolVersion);
+        VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion());
+        HttpFlowFileServerProtocol serverProtocol = 
getHttpFlowFileServerProtocol(versionNegotiator);
+        
HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
+        // TODO: How should I pass cluster information?
+        // serverProtocol.setNodeInformant(clusterManager);
+        serverProtocol.handshake(peer);
+        return serverProtocol;
+    }
+
+    HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator 
versionNegotiator) {
+        return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+    }
+
+    private Peer constructPeer(HttpServletRequest req, InputStream 
inputStream, OutputStream outputStream, String portId, String transactionId) {
+        String clientHostName = req.getRemoteHost();
+        int clientPort = req.getRemotePort();
+
+        PeerDescription peerDescription = new PeerDescription(clientHostName, 
clientPort, req.isSecure());
+
+        HttpServerCommunicationsSession commSession = new 
HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+
+        boolean useCompression = false;
+        final String useCompressionStr = 
req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
+        if (!isEmpty(useCompressionStr) && Boolean.valueOf(useCompressionStr)) 
{
+            useCompression = true;
+        }
+
+        final String requestExpiration = 
req.getHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION);
+        final String batchCount = 
req.getHeader(HANDSHAKE_PROPERTY_BATCH_COUNT);
+        final String batchSize = req.getHeader(HANDSHAKE_PROPERTY_BATCH_SIZE);
+        final String batchDuration = 
req.getHeader(HANDSHAKE_PROPERTY_BATCH_DURATION);
+
+        commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, 
portId);
+        commSession.putHandshakeParam(HandshakeProperty.GZIP, 
String.valueOf(useCompression));
+
+        if (!isEmpty(requestExpiration)) 
commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
+        if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, 
batchCount);
+        if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, 
batchSize);
+        if (!isEmpty(batchDuration)) 
commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+
+        if(peerDescription.isSecure()){
+            NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
+            logger.debug("initiating peer, nifiUser={}", nifiUser);
+            commSession.setUserDn(nifiUser.getIdentity());
+        }
+
+        // TODO: Followed how SocketRemoteSiteListener define peerUrl and 
clusterUrl, but it can be more meaningful values, especially for clusterUrl.
+        String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
+        String clusterUrl = "nifi://localhost:" + req.getLocalPort();
+        return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
+    }
+
+    @DELETE
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("output-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Commit or cancel the specified transaction",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not 
ready for serving request, or temporarily overloaded. Retrying the same request 
later may be successful"),
+            }
+    )
+    public Response commitOutputPortTransaction(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The response code. Available values are 
CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).",
+                    required = true
+            )
+            @QueryParam(RESPONSE_CODE) Integer responseCode,
+            @ApiParam(
+                    value = "A checksum calculated at client side using CRC32 
to check flow file content integrity. It must match with the value calculated 
at server side.",
+                    required = true
+            )
+            @QueryParam(CHECK_SUM) @DefaultValue(StringUtils.EMPTY) String 
checksum,
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @ApiParam(
+                    value = "The transaction id.",
+                    required = true
+            )
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+
+        logger.debug("commitOutputPortTransaction request: clientId={}, 
portId={}, transactionId={}", clientId, portId, transactionId);
+
+        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        try {
+            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+
+            String inputErrMessage = null;
+            if (responseCode == null) {
+                inputErrMessage = "responseCode is required.";
+            } else if(ResponseCode.CONFIRM_TRANSACTION.getCode() != 
responseCode
+                    && ResponseCode.CANCEL_TRANSACTION.getCode() != 
responseCode) {
+                inputErrMessage = "responseCode " + responseCode + " is 
invalid. ";
+            }
+
+            if (inputErrMessage != null){
+                entity.setMessage(inputErrMessage);
+                entity.setResponseCode(ResponseCode.ABORT.getCode());
+                return 
Response.status(Response.Status.BAD_REQUEST).entity(entity).build();
+            }
+
+            if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) {
+                return cancelTransaction(transactionId, entity);
+            }
+
+            int flowFileSent = serverProtocol.commitTransferTransaction(peer, 
checksum);
+            entity.setResponseCode(ResponseCode.CONFIRM_TRANSACTION.getCode());
+            entity.setFlowFileSent(flowFileSent);
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+            logger.error("Failed to process the request", e);
+            
if(ResponseCode.BAD_CHECKSUM.equals(commsSession.getResponseCode())){
+                
entity.setResponseCode(commsSession.getResponseCode().getCode());
+                entity.setMessage(e.getMessage());
+
+                Response.ResponseBuilder builder = 
Response.status(Response.Status.BAD_REQUEST).entity(entity);
+                return clusterContext(noCache(builder)).build();
+            }
+
+            return responseCreator.unexpectedErrorResponse(clientId, portId, 
transactionId, e);
+        }
+
+        return clusterContext(noCache(setCommonHeaders(Response.ok(entity), 
transportProtocolVersion))).build();
+    }
+
+
+    @DELETE
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("input-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Commit or cancel the specified transaction",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not 
ready for serving request, or temporarily overloaded. Retrying the same request 
later may be successful"),
+            }
+    )
+    public Response commitInputPortTransaction(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The response code. Available values are 
BAD_CHECKSUM(19), CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).",
+                    required = true
+            )
+            @QueryParam(RESPONSE_CODE) Integer responseCode,
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @ApiParam(
+                    value = "The transaction id.",
+                    required = true
+            )
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+
+        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("commitInputPortTransaction request: clientId={}, 
portId={}, transactionId={}, responseCode={}",
+                clientId, portId, transactionId, responseCode);
+
+        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        try {
+            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+            HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+            // Pass the response code sent from the client.
+            String inputErrMessage = null;
+            if (responseCode == null) {
+                inputErrMessage = "responseCode is required.";
+            } else if(ResponseCode.BAD_CHECKSUM.getCode() != responseCode
+                    && ResponseCode.CONFIRM_TRANSACTION.getCode() != 
responseCode
+                    && ResponseCode.CANCEL_TRANSACTION.getCode() != 
responseCode) {
+                inputErrMessage = "responseCode " + responseCode + " is 
invalid. ";
+            }
+
+            if (inputErrMessage != null){
+                entity.setMessage(inputErrMessage);
+                entity.setResponseCode(ResponseCode.ABORT.getCode());
+                return 
Response.status(Response.Status.BAD_REQUEST).entity(entity).build();
+            }
+
+            if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) {
+                return cancelTransaction(transactionId, entity);
+            }
+
+            commsSession.setResponseCode(ResponseCode.fromCode(responseCode));
+
+            try {
+                int flowFileSent = 
serverProtocol.commitReceiveTransaction(peer);
+                
entity.setResponseCode(commsSession.getResponseCode().getCode());
+                entity.setFlowFileSent(flowFileSent);
+
+            } catch (IOException e){
+                if (ResponseCode.BAD_CHECKSUM.getCode() == responseCode && 
e.getMessage().contains("Received a BadChecksum response")){
+                    // AbstractFlowFileServerProtocol throws IOException after 
it canceled transaction.
+                    // This is a known behavior and if we return 500 with this 
exception,
+                    // it's not clear if there is an issue at server side, or 
cancel operation has been accomplished.
+                    // Above conditions can guarantee this is the latter case, 
we return 200 OK here.
+                    
entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+                    return 
clusterContext(noCache(Response.ok(entity))).build();
+                } else {
+                    return responseCreator.unexpectedErrorResponse(clientId, 
portId, transactionId, e);
+                }
+            }
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(clientId, portId, 
transactionId, e);
+        }
+
+        return clusterContext(noCache(setCommonHeaders(Response.ok(entity), 
transportProtocolVersion))).build();
+    }
+
+    private Response cancelTransaction(String transactionId, 
TransactionResultEntity entity) {
+        transactionManager.cancelTransaction(transactionId);
+        entity.setMessage("Transaction has been canceled.");
+        entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+        return Response.ok(entity).build();
+    }
+
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_OCTET_STREAM)
+    @Path("output-ports/{portId}/transactions/{transactionId}/flow-files")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Transfer flow files from the output port",
+            response = StreamingOutput.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 200, message = "There is no flow file 
to return."),
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not 
ready for serving request, or temporarily overloaded. Retrying the same request 
later may be successful"),
+            }
+    )
+    public Response transferFlowFiles(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context HttpServletResponse res,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("transferFlowFiles request: clientId={}, portId={}", 
clientId.getClientId(), portId);
+
+        // Before opening the real output stream for HTTP response,
+        // use this temporary output stream to buffer handshake result.
+        final ByteArrayOutputStream tempBos = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, tempBos, portId, 
transactionId);
+        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
+        try {
+            final HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
+
+            StreamingOutput flowFileContent = new StreamingOutput() {
+                @Override
+                public void write(OutputStream outputStream) throws 
IOException, WebApplicationException {
+
+                    HttpOutput output = 
(HttpOutput)peer.getCommunicationsSession().getOutput();
+                    output.setOutputStream(outputStream);
+
+                    try {
+                        int numOfFlowFiles = 
serverProtocol.getPort().transferFlowFiles(peer, serverProtocol);
+                        logger.debug("finished transferring flow files, 
numOfFlowFiles={}", numOfFlowFiles);
+                        if(numOfFlowFiles < 1){
+                            // There was no flow file to transfer. Throw this 
exception to stop responding with SEE OTHER.
+                            throw new 
WebApplicationException(Response.Status.OK);
+                        }
+                    } catch (NotAuthorizedException | BadRequestException | 
RequestExpiredException e) {
+                        // Handshake is done outside of write() method, so 
these exception wouldn't be thrown.
+                        throw new IOException("Failed to process the 
request.", e);
+                    }
+                }
+
+            };
+
+            return responseCreator.acceptedResponse(flowFileContent, 
transportProtocolVersion);
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(clientId, portId, 
e);
+        }
+    }
+
+    @PUT
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("input-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Extend transaction TTL",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response extendInputPortTransactionTTL(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context HttpServletResponse res,
+            @Context ServletContext context,
+            @Context UriInfo uriInfo,
+            InputStream inputStream) {
+
+        return extendPortTransactionTTL(clientId, PORT_TYPE_INPUT, portId, 
transactionId, req, res, context, uriInfo, inputStream);
+
+    }
+
+    @PUT
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("output-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Extend transaction TTL",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not 
ready for serving request, or temporarily overloaded. Retrying the same request 
later may be successful"),
+            }
+    )
+    public Response extendOutputPortTransactionTTL(
+            @ApiParam(
+                    value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
+                    required = false
+            )
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context HttpServletResponse res,
+            @Context ServletContext context,
+            @Context UriInfo uriInfo,
+            InputStream inputStream) {
+
+        return extendPortTransactionTTL(clientId, PORT_TYPE_OUTPUT, portId, 
transactionId, req, res, context, uriInfo, inputStream);
+
+    }
+
+    public Response extendPortTransactionTTL(
+            ClientIdParameter clientId,
+            String portType,
+            String portId,
+            String transactionId,
+            HttpServletRequest req,
+            HttpServletResponse res,
+            ServletContext context,
+            UriInfo uriInfo,
+            InputStream inputStream) {
+
+        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        if(!PORT_TYPE_INPUT.equals(portType) && 
!PORT_TYPE_OUTPUT.equals(portType)){
+            return responseCreator.wrongPortTypeResponse(clientId, portType, 
portId);
+        }
+
+        logger.debug("extendOutputPortTransactionTTL request: clientId={}, 
portType={}, portId={}, transactionId={}",
+                clientId.getClientId(), portType, portId, transactionId);
+
+        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
+
+        try {
+            // Do handshake
+            initiateServerProtocol(peer, transportProtocolVersion);
+            transactionManager.extendsTransaction(transactionId);
+
+            final TransactionResultEntity entity = new 
TransactionResultEntity();
+            
entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());
+            entity.setMessage("Extended TTL.");
+            return 
clusterContext(noCache(setCommonHeaders(Response.ok(entity), 
transportProtocolVersion))).build();
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(clientId, portId, 
transactionId, e);
+        }
+
+    }
+
+    private class ValidateRequestResult {
+        private Integer transportProtocolVersion;
+        private Response errResponse;
+    }
+
+    private ValidateRequestResult validateResult(HttpServletRequest req, 
ClientIdParameter clientId, String portId) {
+        return validateResult(req, clientId, portId, null);
+    }
+
+    private ValidateRequestResult validateResult(HttpServletRequest req, 
ClientIdParameter clientId, String portId, String transactionId) {
+        ValidateRequestResult result = new ValidateRequestResult();
+        if(!properties.isSiteToSiteHttpEnabled()) {
+            result.errResponse = 
responseCreator.httpSiteToSiteIsNotEnabledResponse();
+            return result;
+        }
+
+        // TODO: NCM no longer exists.
+        /*
+        if (properties.isClusterManager()) {
+            result.errResponse = 
responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on 
a NiFi Cluster Manager.");
+            return result;
+        }
+        */
+
+
+        try {
+            result.transportProtocolVersion = 
negotiateTransportProtocolVersion(req);
+        } catch (BadRequestException e) {
+            result.errResponse = responseCreator.badRequestResponse(e);
+            return result;
+        }
+
+        if(!isEmpty(transactionId) && 
!transactionManager.isTransactionActive(transactionId)) {
+            result.errResponse = 
responseCreator.transactionNotFoundResponse(clientId, portId, transactionId);
+            return  result;
+        }
+
+        return result;
+    }
+
+
     // setters
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {
         this.serviceFacade = serviceFacade;
     }
+
+
+    private class ResponseCreator {
+
+        private Response nodeTypeErrorResponse(String errMsg) {
+            return 
noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity(errMsg).build();
+        }
+
+        private Response httpSiteToSiteIsNotEnabledResponse() {
+            return 
noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity("HTTP(S)
 Site-to-Site is not enabled on this host.").build();
+        }
+
+        private Response wrongPortTypeResponse(ClientIdParameter clientId, 
String portType, String portId) {
+            logger.debug("Port type was wrong. clientId={}, portType={}, 
portId={}", clientId.getClientId(), portType, portId);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Port was not found.");
+            entity.setFlowFileSent(0);
+            return 
Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        private Response transactionNotFoundResponse(ClientIdParameter 
clientId, String portId, String transactionId) {
+            logger.debug("Transaction was not found. clientId={}, portId={}, 
transactionId={}", clientId.getClientId(), portId, transactionId);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Transaction was not found.");
+            entity.setFlowFileSent(0);
+            return 
Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        private Response unexpectedErrorResponse(ClientIdParameter clientId, 
String portId, Exception e) {
+            logger.error("Unexpected exception occurred. clientId={}, 
portId={}", clientId.getClientId(), portId);
+            logger.error("Exception detail:", e);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Server encountered an exception.");
+            entity.setFlowFileSent(0);
+            return 
Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        private Response unexpectedErrorResponse(ClientIdParameter clientId, 
String portId, String transactionId, Exception e) {
+            logger.error("Unexpected exception occurred. clientId={}, 
portId={}, transactionId={}", clientId.getClientId(), portId, transactionId);
+            logger.error("Exception detail:", e);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Server encountered an exception.");
+            entity.setFlowFileSent(0);
+            return 
Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        private Response unauthorizedResponse(NotAuthorizedException e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Client request was not authorized. {}", 
e.getMessage());
+            }
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.UNAUTHORIZED.getCode());
+            entity.setMessage(e.getMessage());
+            entity.setFlowFileSent(0);
+            return 
Response.status(Response.Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON_TYPE).entity(e.getMessage()).build();
+        }
+
+        private Response badRequestResponse(Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Client sent a bad request. {}", e.getMessage());
+            }
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage(e.getMessage());
+            entity.setFlowFileSent(0);
+            return 
Response.status(Response.Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+        }
+
+        private Response handshakeExceptionResponse(HandshakeException e) {
+            if(logger.isDebugEnabled()){
+                logger.debug("Handshake failed, {}", e.getMessage());
+            }
+            ResponseCode handshakeRes = e.getResponseCode();
+            Response.Status statusCd;
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(handshakeRes != null ? 
handshakeRes.getCode() : ResponseCode.ABORT.getCode());
+            entity.setMessage(e.getMessage());
+            entity.setFlowFileSent(0);
+            switch (handshakeRes) {
+                case PORT_NOT_IN_VALID_STATE:
+                case PORTS_DESTINATION_FULL:
+                    return 
Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+                case UNAUTHORIZED:
+                    statusCd = Response.Status.UNAUTHORIZED;
+                    break;
+                case UNKNOWN_PORT:
+                    statusCd = NOT_FOUND;
+                    break;
+                default:
+                    statusCd = Response.Status.BAD_REQUEST;
+            }
+            return 
Response.status(statusCd).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+        }
+
+        private Response acceptedResponse(Object entity, Integer 
protocolVersion) {
+            return 
noCache(setCommonHeaders(Response.status(Response.Status.ACCEPTED), 
protocolVersion))
+                    .entity(entity).build();
+        }
+
+        private Response locationResponse(UriInfo uriInfo, String portType, 
String portId, String transactionId, Object entity, Integer protocolVersion) {
+            String path = "/site-to-site/" + portType + "/" + portId + 
"/transactions/" + transactionId;
+            URI location = uriInfo.getBaseUriBuilder().path(path).build();
+            return noCache(setCommonHeaders(Response.created(location), 
protocolVersion)
+                    .header(LOCATION_URI_INTENT_NAME, 
LOCATION_URI_INTENT_VALUE))
+                    .entity(entity).build();
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 8ae07f2..d5944f9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -127,6 +127,7 @@ import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.web.FlowModification;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO.AllowableValueDTO;
@@ -172,6 +173,7 @@ public final class DtoFactory {
             return 
Collator.getInstance(Locale.US).compare(class1.getSimpleName(), 
class2.getSimpleName());
         }
     };
+    public static final String SENSITIVE_VALUE_MASK = "********";
 
     private ControllerServiceProvider controllerServiceProvider;
     private EntityFactory entityFactory;
@@ -1140,7 +1142,7 @@ public final class DtoFactory {
             // determine the property value - don't include sensitive 
properties
             String propertyValue = entry.getValue();
             if (propertyValue != null && descriptor.isSensitive()) {
-                propertyValue = "********";
+                propertyValue = SENSITIVE_VALUE_MASK;
             }
 
             // set the property value
@@ -1204,7 +1206,7 @@ public final class DtoFactory {
             // determine the property value - don't include sensitive 
properties
             String propertyValue = entry.getValue();
             if (propertyValue != null && descriptor.isSensitive()) {
-                propertyValue = "********";
+                propertyValue = SENSITIVE_VALUE_MASK;
             }
 
             // set the property value
@@ -1292,7 +1294,7 @@ public final class DtoFactory {
                 // determine the property value - don't include sensitive 
properties
                 String propertyValue = entry.getValue();
                 if (propertyValue != null && descriptor.isSensitive()) {
-                    propertyValue = "********";
+                    propertyValue = SENSITIVE_VALUE_MASK;
                 }
 
                 // set the property value
@@ -1395,6 +1397,13 @@ public final class DtoFactory {
         dto.setTargetUri(group.getTargetUri().toString());
         dto.setFlowRefreshed(group.getLastRefreshTime());
         dto.setContents(contents);
+        dto.setTransportProtocol(group.getTransportProtocol().name());
+        dto.setProxyHost(group.getProxyHost());
+        dto.setProxyPort(group.getProxyPort());
+        dto.setProxyUser(group.getProxyUser());
+        if (!StringUtils.isEmpty(group.getProxyPassword())) {
+            dto.setProxyPassword(SENSITIVE_VALUE_MASK);
+        }
 
         // only specify the secure flag if we know the target system has site 
to site enabled
         if (group.isSiteToSiteEnabled()) {
@@ -2273,7 +2282,7 @@ public final class DtoFactory {
             // determine the property value - don't include sensitive 
properties
             String propertyValue = entry.getValue();
             if (propertyValue != null && descriptor.isSensitive()) {
-                propertyValue = "********";
+                propertyValue = SENSITIVE_VALUE_MASK;
             }
 
             // set the property value
@@ -2600,6 +2609,11 @@ public final class DtoFactory {
         
copy.setInactiveRemoteOutputPortCount(original.getInactiveRemoteOutputPortCount());
         copy.setParentGroupId(original.getParentGroupId());
         copy.setTargetUri(original.getTargetUri());
+        copy.setTransportProtocol(original.getTransportProtocol());
+        copy.setProxyHost(original.getProxyHost());
+        copy.setProxyPort(original.getProxyPort());
+        copy.setProxyUser(original.getProxyUser());
+        copy.setProxyPassword(original.getProxyPassword());
 
         copy.setContents(copyContents);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index dd55753..f234fde 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -694,6 +694,17 @@ public class ControllerFacade implements Authorizable {
     }
 
     /**
+     * Returns the http(s) port that the Cluster Manager is listening on for
+     * Site-to-Site communications
+     *
+     * @return the socket port that the Cluster Manager is listening on for
+     *         Site-to-Site communications
+     */
+    public Integer getClusterManagerRemoteSiteListeningHttpPort() {
+        return flowController.getClusterManagerRemoteSiteListeningHttpPort();
+    }
+
+    /**
      * Indicates whether or not Site-to-Site communications with the Cluster
      * Manager are secure
      *
@@ -716,6 +727,17 @@ public class ControllerFacade implements Authorizable {
     }
 
     /**
+     * Returns the http(s) port that the local instance is listening on for
+     * Site-to-Site communications
+     *
+     * @return the socket port that the local instance is listening on for
+     *         Site-to-Site communications
+     */
+    public Integer getRemoteSiteListeningHttpPort() {
+        return flowController.getRemoteSiteListeningHttpPort();
+    }
+
+    /**
      * Indicates whether or not Site-to-Site communications with the local
      * instance are secure
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index 8c877e2..bf4c96e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -18,11 +18,14 @@ package org.apache.nifi.web.dao.impl;
 
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.exception.ValidationException;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@@ -34,6 +37,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
 
+import static org.apache.nifi.util.StringUtils.isEmpty;
+
 public class StandardRemoteProcessGroupDAO extends ComponentDAO implements 
RemoteProcessGroupDAO {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteProcessGroupDAO.class);
@@ -78,11 +83,8 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         // create the remote process group
         RemoteProcessGroup remoteProcessGroup = 
flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), 
rawTargetUri);
 
-        // update the remote process group
-        if (isNotNull(remoteProcessGroupDTO.getPosition())) {
-            remoteProcessGroup.setPosition(new 
Position(remoteProcessGroupDTO.getPosition().getX(), 
remoteProcessGroupDTO.getPosition().getY()));
-        }
-        remoteProcessGroup.setComments(remoteProcessGroupDTO.getComments());
+        // set other properties
+        updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
 
         // get the group to add the remote process group to
         group.addRemoteProcessGroup(remoteProcessGroup);
@@ -134,10 +136,19 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         }
 
         // validate the proposed configuration
-        validateProposedRemoteProcessGroupConfiguration(remoteProcessGroupDto);
+        final List<String> requestValidation = 
validateProposedRemoteProcessGroupConfiguration(remoteProcessGroupDto);
+        // ensure there was no validation errors
+        if (!requestValidation.isEmpty()) {
+            throw new ValidationException(requestValidation);
+        }
 
         // if any remote group properties are changing, verify update
-        if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(), 
remoteProcessGroupDto.getCommunicationsTimeout())) {
+        if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(),
+                remoteProcessGroupDto.getCommunicationsTimeout(),
+                remoteProcessGroupDto.getProxyHost(),
+                remoteProcessGroupDto.getProxyPort(),
+                remoteProcessGroupDto.getProxyUser(),
+                remoteProcessGroupDto.getProxyPassword())) {
             remoteProcessGroup.verifyCanUpdate();
         }
     }
@@ -182,7 +193,12 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         }
 
         // validate the proposed configuration
-        validateProposedRemoteProcessGroupPortConfiguration(port, 
remoteProcessGroupPortDto);
+        final List<String> requestValidation = 
validateProposedRemoteProcessGroupPortConfiguration(port, 
remoteProcessGroupPortDto);
+        // ensure there was no validation errors
+        if (!requestValidation.isEmpty()) {
+            throw new ValidationException(requestValidation);
+        }
+
 
         // verify update when appropriate
         if 
(isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), 
remoteProcessGroupPortDto.getUseCompression())) {
@@ -222,7 +238,33 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
                 validationErrors.add("Yield duration is not a valid time 
duration (ie 30 sec, 5 min)");
             }
         }
+        String proxyPassword = remoteProcessGroupDTO.getProxyPassword();
+        String proxyUser = remoteProcessGroupDTO.getProxyUser();
+        String proxyHost = remoteProcessGroupDTO.getProxyHost();
+
+        if (isNotNull(remoteProcessGroupDTO.getProxyPort())) {
+            if (isEmpty(proxyHost)) {
+                validationErrors.add("Proxy port was specified, but proxy host 
was empty.");
+            }
+        }
 
+        if (!isEmpty(proxyUser)) {
+            if (isEmpty(proxyHost)) {
+                validationErrors.add("Proxy user name was specified, but proxy 
host was empty.");
+            }
+            if (isEmpty(proxyPassword)) {
+                validationErrors.add("User password should be specified if 
Proxy server needs user authentication.");
+            }
+        }
+
+        if (!isEmpty(proxyPassword)) {
+            if (isEmpty(proxyHost)) {
+                validationErrors.add("Proxy user password was specified, but 
proxy host was empty.");
+            }
+            if (isEmpty(proxyPassword)) {
+                validationErrors.add("User name should be specified if Proxy 
server needs user authentication.");
+            }
+        }
         return validationErrors;
     }
 
@@ -297,7 +339,12 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
     @Override
     public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO 
remoteProcessGroupDTO) {
         RemoteProcessGroup remoteProcessGroup = 
locateRemoteProcessGroup(remoteProcessGroupDTO.getId());
+        return updateRemoteProcessGroup(remoteProcessGroup, 
remoteProcessGroupDTO);
+
 
+    }
+
+    private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup 
remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) {
         // verify the update request
         verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO);
 
@@ -306,6 +353,12 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         final String comments = remoteProcessGroupDTO.getComments();
         final String communicationsTimeout = 
remoteProcessGroupDTO.getCommunicationsTimeout();
         final String yieldDuration = remoteProcessGroupDTO.getYieldDuration();
+        final String proxyHost = remoteProcessGroupDTO.getProxyHost();
+        final Integer proxyPort = remoteProcessGroupDTO.getProxyPort();
+        final String proxyUser = remoteProcessGroupDTO.getProxyUser();
+        final String proxyPassword = remoteProcessGroupDTO.getProxyPassword();
+
+        final String transportProtocol = 
remoteProcessGroupDTO.getTransportProtocol();
 
         if (isNotNull(name)) {
             remoteProcessGroup.setName(name);
@@ -322,6 +375,22 @@ public class StandardRemoteProcessGroupDAO extends 
ComponentDAO implements Remot
         if (isNotNull(remoteProcessGroupDTO.getPosition())) {
             remoteProcessGroup.setPosition(new 
Position(remoteProcessGroupDTO.getPosition().getX(), 
remoteProcessGroupDTO.getPosition().getY()));
         }
+        if (isNotNull(transportProtocol)) {
+            
remoteProcessGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase()));
+            // No null check because these proxy settings have to be clear if 
not specified.
+            // But when user Enable/Disable transmission, only isTransmitting 
is sent.
+            // To prevent clearing these values in that case, set these only 
if transportProtocol is sent,
+            // assuming UI sends transportProtocol always for update.
+            remoteProcessGroup.setProxyHost(proxyHost);
+            remoteProcessGroup.setProxyPort(proxyPort);
+            remoteProcessGroup.setProxyUser(proxyUser);
+            // Keep using current password when null or "********" was sent.
+            // Passing other values updates the password,
+            // specify empty String to clear password.
+            if (isNotNull(proxyPassword) && 
!DtoFactory.SENSITIVE_VALUE_MASK.equals(proxyPassword)) {
+                remoteProcessGroup.setProxyPassword(proxyPassword);
+            }
+        }
 
         final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
         if (isNotNull(isTransmitting)) {

Reply via email to