http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java index 96f252b..0cde5cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java @@ -16,28 +16,86 @@ */ package org.apache.nifi.web.api; +import com.sun.jersey.api.core.ResourceContext; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.remote.HttpRemoteSiteListener; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; +import org.apache.nifi.remote.exception.BadRequestException; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.NotAuthorizedException; +import org.apache.nifi.remote.exception.RequestExpiredException; +import org.apache.nifi.remote.io.http.HttpOutput; +import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession; +import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; +import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl; +import org.apache.nifi.remote.protocol.http.HttpHeaders; +import org.apache.nifi.remote.protocol.HandshakeProperty; +import org.apache.nifi.remote.protocol.ResponseCode; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.remote.PeerDTO; +import org.apache.nifi.web.api.entity.ControllerEntity; +import org.apache.nifi.web.api.entity.PeersEntity; +import org.apache.nifi.web.api.entity.TransactionResultEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.UriInfo; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.entity.ControllerEntity; - -import com.sun.jersey.api.core.ResourceContext; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; +import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT; +import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_DURATION; +import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_SIZE; +import static org.apache.nifi.remote.protocol.HandshakeProperty.REQUEST_EXPIRATION_MILLIS; /** - * RESTful endpoint for managing a Flow Controller. + * RESTful endpoint for managing a SiteToSite connection. */ @Path("/site-to-site") @Api( @@ -46,7 +104,19 @@ import com.wordnik.swagger.annotations.Authorization; ) public class SiteToSiteResource extends ApplicationResource { + private static final Logger logger = LoggerFactory.getLogger(SiteToSiteResource.class); + + public static final String CHECK_SUM = "checksum"; + public static final String RESPONSE_CODE = "responseCode"; + + + private static final String PORT_TYPE_INPUT = "input-ports"; + private static final String PORT_TYPE_OUTPUT = "output-ports"; + private NiFiServiceFacade serviceFacade; + private final ResponseCreator responseCreator = new ResponseCreator(); + private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); + private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(); @Context private ResourceContext resourceContext; @@ -73,7 +143,8 @@ public class SiteToSiteResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response getController() { + public Response getController( + @Context HttpServletRequest req) { if (isReplicateRequest()) { return replicate(HttpMethod.GET); @@ -86,12 +157,954 @@ public class SiteToSiteResource extends ApplicationResource { final ControllerEntity entity = new ControllerEntity(); entity.setController(controller); + if (isEmpty(req.getHeader(HttpHeaders.PROTOCOL_VERSION))) { + // This indicates the client uses older NiFi version, + // which strictly read JSON properties and fail with unknown properties. + // Convert result entity so that old version clients can understance. + logger.debug("Converting result to provide backward compatibility..."); + controller.setRemoteSiteHttpListeningPort(null); + } + // generate the response return clusterContext(noCache(Response.ok(entity))).build(); } + private Response.ResponseBuilder setCommonHeaders(Response.ResponseBuilder builder, Integer transportProtocolVersion) { + return builder.header(HttpHeaders.PROTOCOL_VERSION, transportProtocolVersion) + .header(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, transactionManager.getTransactionTtlSec()); + } + + private Integer negotiateTransportProtocolVersion(@Context HttpServletRequest req) throws BadRequestException { + String protocolVersionStr = req.getHeader(HttpHeaders.PROTOCOL_VERSION); + if (isEmpty(protocolVersionStr)) { + throw new BadRequestException("Protocol version was not specified."); + } + + final Integer requestedProtocolVersion; + try { + requestedProtocolVersion = Integer.valueOf(protocolVersionStr); + } catch (NumberFormatException e) { + throw new BadRequestException("Specified protocol version was not in a valid number format: " + protocolVersionStr); + } + + Integer protocolVersion; + if (transportProtocolVersionNegotiator.isVersionSupported(requestedProtocolVersion)) { + return requestedProtocolVersion; + } else { + protocolVersion = transportProtocolVersionNegotiator.getPreferredVersion(requestedProtocolVersion); + } + + if (protocolVersion == null) { + throw new BadRequestException("Specified protocol version is not supported: " + protocolVersionStr); + } + return protocolVersion; + } + + + /** + * Returns the available Peers and its status of this NiFi. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @return A peersEntity. + */ + @GET + @Path("/peers") + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + // TODO: @PreAuthorize("hasRole('ROLE_NIFI')") + @ApiOperation( + value = "Returns the available Peers and its status of this NiFi", + response = PeersEntity.class, + authorizations = @Authorization(value = "NiFi", type = "ROLE_NIFI") + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getPeers( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @Context HttpServletRequest req) { + + if (!properties.isSiteToSiteHttpEnabled()) { + return responseCreator.httpSiteToSiteIsNotEnabledResponse(); + } + + final Integer transportProtocolVersion; + try { + transportProtocolVersion = negotiateTransportProtocolVersion(req); + } catch (BadRequestException e) { + return responseCreator.badRequestResponse(e); + } + + ArrayList<PeerDTO> peers; + + if (properties.isNode()) { + return responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is only accessible on NCM or Standalone NiFi instance."); + // TODO: NCM no longer exists. + /* + } else if (properties.isClusterManager()) { + ClusterNodeInformation clusterNodeInfo = clusterManager.getNodeInformation(); + final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation(); + peers = new ArrayList<>(nodeInfos.size()); + for (NodeInformation nodeInfo : nodeInfos) { + if (nodeInfo.getSiteToSiteHttpApiPort() == null) { + continue; + } + PeerDTO peer = new PeerDTO(); + peer.setHostname(nodeInfo.getSiteToSiteHostname()); + peer.setPort(nodeInfo.getSiteToSiteHttpApiPort()); + peer.setSecure(nodeInfo.isSiteToSiteSecure()); + peer.setFlowFileCount(nodeInfo.getTotalFlowFiles()); + peers.add(peer); + } + */ + } else { + // Standalone mode. + PeerDTO peer = new PeerDTO(); + // req.getLocalName returns private IP address, that can't be accessed from client in some environments. + // So, use the value defined in nifi.properties instead when it is defined. + String remoteInputHost = properties.getRemoteInputHost(); + peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() : remoteInputHost); + peer.setPort(properties.getRemoteInputHttpPort()); + peer.setSecure(properties.isSiteToSiteSecure()); + peer.setFlowFileCount(0); // doesn't matter how many FlowFiles we have, because we're the only host. + + peers = new ArrayList<>(1); + peers.add(peer); + + } + + PeersEntity entity = new PeersEntity(); + entity.setPeers(peers); + + return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion))).build(); + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("{portType}/{portId}/transactions") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Create a transaction to the specified output port or input port", + response = TransactionResultEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), + @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"), + } + ) + public Response createPortTransaction( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The input port id.", + required = true + ) + @PathParam("portType") String portType, + @PathParam("portId") String portId, + @Context HttpServletRequest req, + @Context ServletContext context, + @Context UriInfo uriInfo, + InputStream inputStream) { + + final ValidateRequestResult validationResult = validateResult(req, clientId, portId); + if (validationResult.errResponse != null) { + return validationResult.errResponse; + } + + if(!PORT_TYPE_INPUT.equals(portType) && !PORT_TYPE_OUTPUT.equals(portType)){ + return responseCreator.wrongPortTypeResponse(clientId, portType, portId); + } + + logger.debug("createPortTransaction request: clientId={}, portType={}, portId={}", clientId.getClientId(), portType, portId); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final String transactionId = transactionManager.createTransaction(); + final Peer peer = constructPeer(req, inputStream, out, portId, transactionId); + final int transportProtocolVersion = validationResult.transportProtocolVersion; + + try { + // Execute handshake. + initiateServerProtocol(peer, transportProtocolVersion); + + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode()); + entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId); + + return responseCreator.locationResponse(uriInfo, portType, portId, transactionId, entity, transportProtocolVersion); + + } catch (HandshakeException e) { + transactionManager.cancelTransaction(transactionId); + return responseCreator.handshakeExceptionResponse(e); + + } catch (Exception e) { + transactionManager.cancelTransaction(transactionId); + return responseCreator.unexpectedErrorResponse(clientId, portId, e); + } + } + + @POST + @Consumes(MediaType.APPLICATION_OCTET_STREAM) + @Produces(MediaType.TEXT_PLAIN) + @Path("input-ports/{portId}/transactions/{transactionId}/flow-files") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Transfer flow files to the input port", + response = String.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), + @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"), + } + ) + public Response receiveFlowFiles( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The input port id.", + required = true + ) + @PathParam("portId") String portId, + @PathParam("transactionId") String transactionId, + @Context HttpServletRequest req, + @Context ServletContext context, + InputStream inputStream) { + + final ValidateRequestResult validationResult = validateResult(req, clientId, portId, transactionId); + if (validationResult.errResponse != null) { + return validationResult.errResponse; + } + + logger.debug("receiveFlowFiles request: clientId={}, portId={}", clientId.getClientId(), portId); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Peer peer = constructPeer(req, inputStream, out, portId, transactionId); + final int transportProtocolVersion = validationResult.transportProtocolVersion; + + try { + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + int numOfFlowFiles = serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol); + logger.debug("finished receiving flow files, numOfFlowFiles={}", numOfFlowFiles); + if (numOfFlowFiles < 1) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("Client should send request when there is data to send. There was no flow file sent.").build(); + } + } catch (HandshakeException e) { + return responseCreator.handshakeExceptionResponse(e); + + } catch (NotAuthorizedException e) { + return responseCreator.unauthorizedResponse(e); + + } catch (BadRequestException | RequestExpiredException e) { + return responseCreator.badRequestResponse(e); + + } catch (Exception e) { + return responseCreator.unexpectedErrorResponse(clientId, portId, e); + } + + String serverChecksum = ((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getChecksum(); + return responseCreator.acceptedResponse(serverChecksum, transportProtocolVersion); + } + + private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, Integer transportProtocolVersion) throws IOException { + // Switch transaction protocol version based on transport protocol version. + TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion = new TransportProtocolVersionNegotiator(transportProtocolVersion); + VersionNegotiator versionNegotiator = new StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion()); + HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator); + HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol); + // TODO: How should I pass cluster information? + // serverProtocol.setNodeInformant(clusterManager); + serverProtocol.handshake(peer); + return serverProtocol; + } + + HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) { + return new HttpFlowFileServerProtocolImpl(versionNegotiator); + } + + private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) { + String clientHostName = req.getRemoteHost(); + int clientPort = req.getRemotePort(); + + PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure()); + + HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId); + + boolean useCompression = false; + final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION); + if (!isEmpty(useCompressionStr) && Boolean.valueOf(useCompressionStr)) { + useCompression = true; + } + + final String requestExpiration = req.getHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION); + final String batchCount = req.getHeader(HANDSHAKE_PROPERTY_BATCH_COUNT); + final String batchSize = req.getHeader(HANDSHAKE_PROPERTY_BATCH_SIZE); + final String batchDuration = req.getHeader(HANDSHAKE_PROPERTY_BATCH_DURATION); + + commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId); + commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression)); + + if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration); + if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount); + if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize); + if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration); + + if(peerDescription.isSecure()){ + NiFiUser nifiUser = NiFiUserUtils.getNiFiUser(); + logger.debug("initiating peer, nifiUser={}", nifiUser); + commSession.setUserDn(nifiUser.getIdentity()); + } + + // TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl. + String peerUrl = "nifi://" + clientHostName + ":" + clientPort; + String clusterUrl = "nifi://localhost:" + req.getLocalPort(); + return new Peer(peerDescription, commSession, peerUrl, clusterUrl); + } + + @DELETE + @Consumes(MediaType.APPLICATION_OCTET_STREAM) + @Produces(MediaType.APPLICATION_JSON) + @Path("output-ports/{portId}/transactions/{transactionId}") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Commit or cancel the specified transaction", + response = TransactionResultEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), + @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"), + } + ) + public Response commitOutputPortTransaction( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The response code. Available values are CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).", + required = true + ) + @QueryParam(RESPONSE_CODE) Integer responseCode, + @ApiParam( + value = "A checksum calculated at client side using CRC32 to check flow file content integrity. It must match with the value calculated at server side.", + required = true + ) + @QueryParam(CHECK_SUM) @DefaultValue(StringUtils.EMPTY) String checksum, + @ApiParam( + value = "The input port id.", + required = true + ) + @PathParam("portId") String portId, + @ApiParam( + value = "The transaction id.", + required = true + ) + @PathParam("transactionId") String transactionId, + @Context HttpServletRequest req, + @Context ServletContext context, + InputStream inputStream) { + + final ValidateRequestResult validationResult = validateResult(req, clientId, portId, transactionId); + if (validationResult.errResponse != null) { + return validationResult.errResponse; + } + + + logger.debug("commitOutputPortTransaction request: clientId={}, portId={}, transactionId={}", clientId, portId, transactionId); + + final int transportProtocolVersion = validationResult.transportProtocolVersion; + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Peer peer = constructPeer(req, inputStream, out, portId, transactionId); + + final TransactionResultEntity entity = new TransactionResultEntity(); + try { + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + + String inputErrMessage = null; + if (responseCode == null) { + inputErrMessage = "responseCode is required."; + } else if(ResponseCode.CONFIRM_TRANSACTION.getCode() != responseCode + && ResponseCode.CANCEL_TRANSACTION.getCode() != responseCode) { + inputErrMessage = "responseCode " + responseCode + " is invalid. "; + } + + if (inputErrMessage != null){ + entity.setMessage(inputErrMessage); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + return Response.status(Response.Status.BAD_REQUEST).entity(entity).build(); + } + + if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) { + return cancelTransaction(transactionId, entity); + } + + int flowFileSent = serverProtocol.commitTransferTransaction(peer, checksum); + entity.setResponseCode(ResponseCode.CONFIRM_TRANSACTION.getCode()); + entity.setFlowFileSent(flowFileSent); + + } catch (HandshakeException e) { + return responseCreator.handshakeExceptionResponse(e); + + } catch (Exception e) { + HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + logger.error("Failed to process the request", e); + if(ResponseCode.BAD_CHECKSUM.equals(commsSession.getResponseCode())){ + entity.setResponseCode(commsSession.getResponseCode().getCode()); + entity.setMessage(e.getMessage()); + + Response.ResponseBuilder builder = Response.status(Response.Status.BAD_REQUEST).entity(entity); + return clusterContext(noCache(builder)).build(); + } + + return responseCreator.unexpectedErrorResponse(clientId, portId, transactionId, e); + } + + return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion))).build(); + } + + + @DELETE + @Consumes(MediaType.APPLICATION_OCTET_STREAM) + @Produces(MediaType.APPLICATION_JSON) + @Path("input-ports/{portId}/transactions/{transactionId}") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Commit or cancel the specified transaction", + response = TransactionResultEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), + @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"), + } + ) + public Response commitInputPortTransaction( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The response code. Available values are BAD_CHECKSUM(19), CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).", + required = true + ) + @QueryParam(RESPONSE_CODE) Integer responseCode, + @ApiParam( + value = "The input port id.", + required = true + ) + @PathParam("portId") String portId, + @ApiParam( + value = "The transaction id.", + required = true + ) + @PathParam("transactionId") String transactionId, + @Context HttpServletRequest req, + @Context ServletContext context, + InputStream inputStream) { + + + final ValidateRequestResult validationResult = validateResult(req, clientId, portId, transactionId); + if (validationResult.errResponse != null) { + return validationResult.errResponse; + } + + logger.debug("commitInputPortTransaction request: clientId={}, portId={}, transactionId={}, responseCode={}", + clientId, portId, transactionId, responseCode); + + final int transportProtocolVersion = validationResult.transportProtocolVersion; + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Peer peer = constructPeer(req, inputStream, out, portId, transactionId); + + final TransactionResultEntity entity = new TransactionResultEntity(); + try { + HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession(); + // Pass the response code sent from the client. + String inputErrMessage = null; + if (responseCode == null) { + inputErrMessage = "responseCode is required."; + } else if(ResponseCode.BAD_CHECKSUM.getCode() != responseCode + && ResponseCode.CONFIRM_TRANSACTION.getCode() != responseCode + && ResponseCode.CANCEL_TRANSACTION.getCode() != responseCode) { + inputErrMessage = "responseCode " + responseCode + " is invalid. "; + } + + if (inputErrMessage != null){ + entity.setMessage(inputErrMessage); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + return Response.status(Response.Status.BAD_REQUEST).entity(entity).build(); + } + + if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) { + return cancelTransaction(transactionId, entity); + } + + commsSession.setResponseCode(ResponseCode.fromCode(responseCode)); + + try { + int flowFileSent = serverProtocol.commitReceiveTransaction(peer); + entity.setResponseCode(commsSession.getResponseCode().getCode()); + entity.setFlowFileSent(flowFileSent); + + } catch (IOException e){ + if (ResponseCode.BAD_CHECKSUM.getCode() == responseCode && e.getMessage().contains("Received a BadChecksum response")){ + // AbstractFlowFileServerProtocol throws IOException after it canceled transaction. + // This is a known behavior and if we return 500 with this exception, + // it's not clear if there is an issue at server side, or cancel operation has been accomplished. + // Above conditions can guarantee this is the latter case, we return 200 OK here. + entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode()); + return clusterContext(noCache(Response.ok(entity))).build(); + } else { + return responseCreator.unexpectedErrorResponse(clientId, portId, transactionId, e); + } + } + + } catch (HandshakeException e) { + return responseCreator.handshakeExceptionResponse(e); + + } catch (Exception e) { + return responseCreator.unexpectedErrorResponse(clientId, portId, transactionId, e); + } + + return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion))).build(); + } + + private Response cancelTransaction(String transactionId, TransactionResultEntity entity) { + transactionManager.cancelTransaction(transactionId); + entity.setMessage("Transaction has been canceled."); + entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode()); + return Response.ok(entity).build(); + } + + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_OCTET_STREAM) + @Path("output-ports/{portId}/transactions/{transactionId}/flow-files") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Transfer flow files from the output port", + response = StreamingOutput.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 200, message = "There is no flow file to return."), + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), + @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"), + } + ) + public Response transferFlowFiles( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The input port id.", + required = true + ) + @PathParam("portId") String portId, + @PathParam("transactionId") String transactionId, + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @Context ServletContext context, + InputStream inputStream) { + + final ValidateRequestResult validationResult = validateResult(req, clientId, portId, transactionId); + if (validationResult.errResponse != null) { + return validationResult.errResponse; + } + + logger.debug("transferFlowFiles request: clientId={}, portId={}", clientId.getClientId(), portId); + + // Before opening the real output stream for HTTP response, + // use this temporary output stream to buffer handshake result. + final ByteArrayOutputStream tempBos = new ByteArrayOutputStream(); + final Peer peer = constructPeer(req, inputStream, tempBos, portId, transactionId); + final int transportProtocolVersion = validationResult.transportProtocolVersion; + try { + final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion); + + StreamingOutput flowFileContent = new StreamingOutput() { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException { + + HttpOutput output = (HttpOutput)peer.getCommunicationsSession().getOutput(); + output.setOutputStream(outputStream); + + try { + int numOfFlowFiles = serverProtocol.getPort().transferFlowFiles(peer, serverProtocol); + logger.debug("finished transferring flow files, numOfFlowFiles={}", numOfFlowFiles); + if(numOfFlowFiles < 1){ + // There was no flow file to transfer. Throw this exception to stop responding with SEE OTHER. + throw new WebApplicationException(Response.Status.OK); + } + } catch (NotAuthorizedException | BadRequestException | RequestExpiredException e) { + // Handshake is done outside of write() method, so these exception wouldn't be thrown. + throw new IOException("Failed to process the request.", e); + } + } + + }; + + return responseCreator.acceptedResponse(flowFileContent, transportProtocolVersion); + + } catch (HandshakeException e) { + return responseCreator.handshakeExceptionResponse(e); + + } catch (Exception e) { + return responseCreator.unexpectedErrorResponse(clientId, portId, e); + } + } + + @PUT + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("input-ports/{portId}/transactions/{transactionId}") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Extend transaction TTL", + response = TransactionResultEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response extendInputPortTransactionTTL( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @PathParam("portId") String portId, + @PathParam("transactionId") String transactionId, + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @Context ServletContext context, + @Context UriInfo uriInfo, + InputStream inputStream) { + + return extendPortTransactionTTL(clientId, PORT_TYPE_INPUT, portId, transactionId, req, res, context, uriInfo, inputStream); + + } + + @PUT + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("output-ports/{portId}/transactions/{transactionId}") + // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") + @ApiOperation( + value = "Extend transaction TTL", + response = TransactionResultEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), + @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"), + } + ) + public Response extendOutputPortTransactionTTL( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @PathParam("portId") String portId, + @PathParam("transactionId") String transactionId, + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @Context ServletContext context, + @Context UriInfo uriInfo, + InputStream inputStream) { + + return extendPortTransactionTTL(clientId, PORT_TYPE_OUTPUT, portId, transactionId, req, res, context, uriInfo, inputStream); + + } + + public Response extendPortTransactionTTL( + ClientIdParameter clientId, + String portType, + String portId, + String transactionId, + HttpServletRequest req, + HttpServletResponse res, + ServletContext context, + UriInfo uriInfo, + InputStream inputStream) { + + final ValidateRequestResult validationResult = validateResult(req, clientId, portId, transactionId); + if (validationResult.errResponse != null) { + return validationResult.errResponse; + } + + if(!PORT_TYPE_INPUT.equals(portType) && !PORT_TYPE_OUTPUT.equals(portType)){ + return responseCreator.wrongPortTypeResponse(clientId, portType, portId); + } + + logger.debug("extendOutputPortTransactionTTL request: clientId={}, portType={}, portId={}, transactionId={}", + clientId.getClientId(), portType, portId, transactionId); + + final int transportProtocolVersion = validationResult.transportProtocolVersion; + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Peer peer = constructPeer(req, inputStream, out, portId, transactionId); + + try { + // Do handshake + initiateServerProtocol(peer, transportProtocolVersion); + transactionManager.extendsTransaction(transactionId); + + final TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode()); + entity.setMessage("Extended TTL."); + return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion))).build(); + + } catch (HandshakeException e) { + return responseCreator.handshakeExceptionResponse(e); + + } catch (Exception e) { + return responseCreator.unexpectedErrorResponse(clientId, portId, transactionId, e); + } + + } + + private class ValidateRequestResult { + private Integer transportProtocolVersion; + private Response errResponse; + } + + private ValidateRequestResult validateResult(HttpServletRequest req, ClientIdParameter clientId, String portId) { + return validateResult(req, clientId, portId, null); + } + + private ValidateRequestResult validateResult(HttpServletRequest req, ClientIdParameter clientId, String portId, String transactionId) { + ValidateRequestResult result = new ValidateRequestResult(); + if(!properties.isSiteToSiteHttpEnabled()) { + result.errResponse = responseCreator.httpSiteToSiteIsNotEnabledResponse(); + return result; + } + + // TODO: NCM no longer exists. + /* + if (properties.isClusterManager()) { + result.errResponse = responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on a NiFi Cluster Manager."); + return result; + } + */ + + + try { + result.transportProtocolVersion = negotiateTransportProtocolVersion(req); + } catch (BadRequestException e) { + result.errResponse = responseCreator.badRequestResponse(e); + return result; + } + + if(!isEmpty(transactionId) && !transactionManager.isTransactionActive(transactionId)) { + result.errResponse = responseCreator.transactionNotFoundResponse(clientId, portId, transactionId); + return result; + } + + return result; + } + + // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; } + + + private class ResponseCreator { + + private Response nodeTypeErrorResponse(String errMsg) { + return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity(errMsg).build(); + } + + private Response httpSiteToSiteIsNotEnabledResponse() { + return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity("HTTP(S) Site-to-Site is not enabled on this host.").build(); + } + + private Response wrongPortTypeResponse(ClientIdParameter clientId, String portType, String portId) { + logger.debug("Port type was wrong. clientId={}, portType={}, portId={}", clientId.getClientId(), portType, portId); + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + entity.setMessage("Port was not found."); + entity.setFlowFileSent(0); + return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); + } + + private Response transactionNotFoundResponse(ClientIdParameter clientId, String portId, String transactionId) { + logger.debug("Transaction was not found. clientId={}, portId={}, transactionId={}", clientId.getClientId(), portId, transactionId); + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + entity.setMessage("Transaction was not found."); + entity.setFlowFileSent(0); + return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); + } + + private Response unexpectedErrorResponse(ClientIdParameter clientId, String portId, Exception e) { + logger.error("Unexpected exception occurred. clientId={}, portId={}", clientId.getClientId(), portId); + logger.error("Exception detail:", e); + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + entity.setMessage("Server encountered an exception."); + entity.setFlowFileSent(0); + return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); + } + + private Response unexpectedErrorResponse(ClientIdParameter clientId, String portId, String transactionId, Exception e) { + logger.error("Unexpected exception occurred. clientId={}, portId={}, transactionId={}", clientId.getClientId(), portId, transactionId); + logger.error("Exception detail:", e); + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + entity.setMessage("Server encountered an exception."); + entity.setFlowFileSent(0); + return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build(); + } + + private Response unauthorizedResponse(NotAuthorizedException e) { + if (logger.isDebugEnabled()) { + logger.debug("Client request was not authorized. {}", e.getMessage()); + } + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.UNAUTHORIZED.getCode()); + entity.setMessage(e.getMessage()); + entity.setFlowFileSent(0); + return Response.status(Response.Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON_TYPE).entity(e.getMessage()).build(); + } + + private Response badRequestResponse(Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Client sent a bad request. {}", e.getMessage()); + } + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(ResponseCode.ABORT.getCode()); + entity.setMessage(e.getMessage()); + entity.setFlowFileSent(0); + return Response.status(Response.Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build(); + } + + private Response handshakeExceptionResponse(HandshakeException e) { + if(logger.isDebugEnabled()){ + logger.debug("Handshake failed, {}", e.getMessage()); + } + ResponseCode handshakeRes = e.getResponseCode(); + Response.Status statusCd; + TransactionResultEntity entity = new TransactionResultEntity(); + entity.setResponseCode(handshakeRes != null ? handshakeRes.getCode() : ResponseCode.ABORT.getCode()); + entity.setMessage(e.getMessage()); + entity.setFlowFileSent(0); + switch (handshakeRes) { + case PORT_NOT_IN_VALID_STATE: + case PORTS_DESTINATION_FULL: + return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build(); + case UNAUTHORIZED: + statusCd = Response.Status.UNAUTHORIZED; + break; + case UNKNOWN_PORT: + statusCd = NOT_FOUND; + break; + default: + statusCd = Response.Status.BAD_REQUEST; + } + return Response.status(statusCd).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build(); + } + + private Response acceptedResponse(Object entity, Integer protocolVersion) { + return noCache(setCommonHeaders(Response.status(Response.Status.ACCEPTED), protocolVersion)) + .entity(entity).build(); + } + + private Response locationResponse(UriInfo uriInfo, String portType, String portId, String transactionId, Object entity, Integer protocolVersion) { + String path = "/site-to-site/" + portType + "/" + portId + "/transactions/" + transactionId; + URI location = uriInfo.getBaseUriBuilder().path(path).build(); + return noCache(setCommonHeaders(Response.created(location), protocolVersion) + .header(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE)) + .entity(entity).build(); + } + + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 8ae07f2..d5944f9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -127,6 +127,7 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.web.FlowModification; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO.AllowableValueDTO; @@ -172,6 +173,7 @@ public final class DtoFactory { return Collator.getInstance(Locale.US).compare(class1.getSimpleName(), class2.getSimpleName()); } }; + public static final String SENSITIVE_VALUE_MASK = "********"; private ControllerServiceProvider controllerServiceProvider; private EntityFactory entityFactory; @@ -1140,7 +1142,7 @@ public final class DtoFactory { // determine the property value - don't include sensitive properties String propertyValue = entry.getValue(); if (propertyValue != null && descriptor.isSensitive()) { - propertyValue = "********"; + propertyValue = SENSITIVE_VALUE_MASK; } // set the property value @@ -1204,7 +1206,7 @@ public final class DtoFactory { // determine the property value - don't include sensitive properties String propertyValue = entry.getValue(); if (propertyValue != null && descriptor.isSensitive()) { - propertyValue = "********"; + propertyValue = SENSITIVE_VALUE_MASK; } // set the property value @@ -1292,7 +1294,7 @@ public final class DtoFactory { // determine the property value - don't include sensitive properties String propertyValue = entry.getValue(); if (propertyValue != null && descriptor.isSensitive()) { - propertyValue = "********"; + propertyValue = SENSITIVE_VALUE_MASK; } // set the property value @@ -1395,6 +1397,13 @@ public final class DtoFactory { dto.setTargetUri(group.getTargetUri().toString()); dto.setFlowRefreshed(group.getLastRefreshTime()); dto.setContents(contents); + dto.setTransportProtocol(group.getTransportProtocol().name()); + dto.setProxyHost(group.getProxyHost()); + dto.setProxyPort(group.getProxyPort()); + dto.setProxyUser(group.getProxyUser()); + if (!StringUtils.isEmpty(group.getProxyPassword())) { + dto.setProxyPassword(SENSITIVE_VALUE_MASK); + } // only specify the secure flag if we know the target system has site to site enabled if (group.isSiteToSiteEnabled()) { @@ -2273,7 +2282,7 @@ public final class DtoFactory { // determine the property value - don't include sensitive properties String propertyValue = entry.getValue(); if (propertyValue != null && descriptor.isSensitive()) { - propertyValue = "********"; + propertyValue = SENSITIVE_VALUE_MASK; } // set the property value @@ -2600,6 +2609,11 @@ public final class DtoFactory { copy.setInactiveRemoteOutputPortCount(original.getInactiveRemoteOutputPortCount()); copy.setParentGroupId(original.getParentGroupId()); copy.setTargetUri(original.getTargetUri()); + copy.setTransportProtocol(original.getTransportProtocol()); + copy.setProxyHost(original.getProxyHost()); + copy.setProxyPort(original.getProxyPort()); + copy.setProxyUser(original.getProxyUser()); + copy.setProxyPassword(original.getProxyPassword()); copy.setContents(copyContents); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index dd55753..f234fde 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -694,6 +694,17 @@ public class ControllerFacade implements Authorizable { } /** + * Returns the http(s) port that the Cluster Manager is listening on for + * Site-to-Site communications + * + * @return the socket port that the Cluster Manager is listening on for + * Site-to-Site communications + */ + public Integer getClusterManagerRemoteSiteListeningHttpPort() { + return flowController.getClusterManagerRemoteSiteListeningHttpPort(); + } + + /** * Indicates whether or not Site-to-Site communications with the Cluster * Manager are secure * @@ -716,6 +727,17 @@ public class ControllerFacade implements Authorizable { } /** + * Returns the http(s) port that the local instance is listening on for + * Site-to-Site communications + * + * @return the socket port that the local instance is listening on for + * Site-to-Site communications + */ + public Integer getRemoteSiteListeningHttpPort() { + return flowController.getRemoteSiteListeningHttpPort(); + } + + /** * Indicates whether or not Site-to-Site communications with the local * instance are secure * http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index 8c877e2..bf4c96e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -18,11 +18,14 @@ package org.apache.nifi.web.dao.impl; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; @@ -34,6 +37,8 @@ import java.util.List; import java.util.Set; import java.util.regex.Matcher; +import static org.apache.nifi.util.StringUtils.isEmpty; + public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO { private static final Logger logger = LoggerFactory.getLogger(StandardRemoteProcessGroupDAO.class); @@ -78,11 +83,8 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // create the remote process group RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), rawTargetUri); - // update the remote process group - if (isNotNull(remoteProcessGroupDTO.getPosition())) { - remoteProcessGroup.setPosition(new Position(remoteProcessGroupDTO.getPosition().getX(), remoteProcessGroupDTO.getPosition().getY())); - } - remoteProcessGroup.setComments(remoteProcessGroupDTO.getComments()); + // set other properties + updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO); // get the group to add the remote process group to group.addRemoteProcessGroup(remoteProcessGroup); @@ -134,10 +136,19 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot } // validate the proposed configuration - validateProposedRemoteProcessGroupConfiguration(remoteProcessGroupDto); + final List<String> requestValidation = validateProposedRemoteProcessGroupConfiguration(remoteProcessGroupDto); + // ensure there was no validation errors + if (!requestValidation.isEmpty()) { + throw new ValidationException(requestValidation); + } // if any remote group properties are changing, verify update - if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(), remoteProcessGroupDto.getCommunicationsTimeout())) { + if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(), + remoteProcessGroupDto.getCommunicationsTimeout(), + remoteProcessGroupDto.getProxyHost(), + remoteProcessGroupDto.getProxyPort(), + remoteProcessGroupDto.getProxyUser(), + remoteProcessGroupDto.getProxyPassword())) { remoteProcessGroup.verifyCanUpdate(); } } @@ -182,7 +193,12 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot } // validate the proposed configuration - validateProposedRemoteProcessGroupPortConfiguration(port, remoteProcessGroupPortDto); + final List<String> requestValidation = validateProposedRemoteProcessGroupPortConfiguration(port, remoteProcessGroupPortDto); + // ensure there was no validation errors + if (!requestValidation.isEmpty()) { + throw new ValidationException(requestValidation); + } + // verify update when appropriate if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), remoteProcessGroupPortDto.getUseCompression())) { @@ -222,7 +238,33 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot validationErrors.add("Yield duration is not a valid time duration (ie 30 sec, 5 min)"); } } + String proxyPassword = remoteProcessGroupDTO.getProxyPassword(); + String proxyUser = remoteProcessGroupDTO.getProxyUser(); + String proxyHost = remoteProcessGroupDTO.getProxyHost(); + + if (isNotNull(remoteProcessGroupDTO.getProxyPort())) { + if (isEmpty(proxyHost)) { + validationErrors.add("Proxy port was specified, but proxy host was empty."); + } + } + if (!isEmpty(proxyUser)) { + if (isEmpty(proxyHost)) { + validationErrors.add("Proxy user name was specified, but proxy host was empty."); + } + if (isEmpty(proxyPassword)) { + validationErrors.add("User password should be specified if Proxy server needs user authentication."); + } + } + + if (!isEmpty(proxyPassword)) { + if (isEmpty(proxyHost)) { + validationErrors.add("Proxy user password was specified, but proxy host was empty."); + } + if (isEmpty(proxyPassword)) { + validationErrors.add("User name should be specified if Proxy server needs user authentication."); + } + } return validationErrors; } @@ -297,7 +339,12 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot @Override public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) { RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupDTO.getId()); + return updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO); + + } + + private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) { // verify the update request verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO); @@ -306,6 +353,12 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot final String comments = remoteProcessGroupDTO.getComments(); final String communicationsTimeout = remoteProcessGroupDTO.getCommunicationsTimeout(); final String yieldDuration = remoteProcessGroupDTO.getYieldDuration(); + final String proxyHost = remoteProcessGroupDTO.getProxyHost(); + final Integer proxyPort = remoteProcessGroupDTO.getProxyPort(); + final String proxyUser = remoteProcessGroupDTO.getProxyUser(); + final String proxyPassword = remoteProcessGroupDTO.getProxyPassword(); + + final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol(); if (isNotNull(name)) { remoteProcessGroup.setName(name); @@ -322,6 +375,22 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot if (isNotNull(remoteProcessGroupDTO.getPosition())) { remoteProcessGroup.setPosition(new Position(remoteProcessGroupDTO.getPosition().getX(), remoteProcessGroupDTO.getPosition().getY())); } + if (isNotNull(transportProtocol)) { + remoteProcessGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase())); + // No null check because these proxy settings have to be clear if not specified. + // But when user Enable/Disable transmission, only isTransmitting is sent. + // To prevent clearing these values in that case, set these only if transportProtocol is sent, + // assuming UI sends transportProtocol always for update. + remoteProcessGroup.setProxyHost(proxyHost); + remoteProcessGroup.setProxyPort(proxyPort); + remoteProcessGroup.setProxyUser(proxyUser); + // Keep using current password when null or "********" was sent. + // Passing other values updates the password, + // specify empty String to clear password. + if (isNotNull(proxyPassword) && !DtoFactory.SENSITIVE_VALUE_MASK.equals(proxyPassword)) { + remoteProcessGroup.setProxyPassword(proxyPassword); + } + } final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting(); if (isNotNull(isTransmitting)) {
