Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/497#discussion_r66080066
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SiteToSiteResource.java
 ---
    @@ -86,12 +156,958 @@ public Response getController() {
             final ControllerEntity entity = new ControllerEntity();
             entity.setController(controller);
     
    +        if (isEmpty(req.getHeader(HttpHeaders.PROTOCOL_VERSION))) {
    +            // This indicates the client uses older NiFi version,
    +            // which strictly read JSON properties and fail with unknown 
properties.
    +            // Convert result entity so that old version clients can 
understance.
    +            logger.debug("Converting result to provide backward 
compatibility...");
    +            controller.setRemoteSiteHttpListeningPort(null);
    +        }
    +
             // generate the response
             return clusterContext(noCache(Response.ok(entity))).build();
         }
     
    +    private Response.ResponseBuilder 
setCommonHeaders(Response.ResponseBuilder builder, Integer 
transportProtocolVersion) {
    +        return builder.header(HttpHeaders.PROTOCOL_VERSION, 
transportProtocolVersion)
    +                .header(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, 
transactionManager.getTransactionTtlSec());
    +    }
    +
    +    private Integer negotiateTransportProtocolVersion(@Context 
HttpServletRequest req) throws BadRequestException {
    +        String protocolVersionStr = 
req.getHeader(HttpHeaders.PROTOCOL_VERSION);
    +        if (isEmpty(protocolVersionStr)) {
    +            throw new BadRequestException("Protocol version was not 
specified.");
    +        }
    +
    +        final Integer requestedProtocolVersion;
    +        try {
    +            requestedProtocolVersion = Integer.valueOf(protocolVersionStr);
    +        } catch (NumberFormatException e) {
    +            throw new BadRequestException("Specified protocol version was 
not in a valid number format: " + protocolVersionStr);
    +        }
    +
    +        Integer protocolVersion;
    +        if 
(transportProtocolVersionNegotiator.isVersionSupported(requestedProtocolVersion))
 {
    +            return requestedProtocolVersion;
    +        } else {
    +            protocolVersion = 
transportProtocolVersionNegotiator.getPreferredVersion(requestedProtocolVersion);
    +        }
    +
    +        if (protocolVersion == null) {
    +            throw new BadRequestException("Specified protocol version is 
not supported: " + protocolVersionStr);
    +        }
    +        return protocolVersion;
    +    }
    +
    +
    +    /**
    +     * Returns the details of this NiFi.
    +     *
    +     * @param clientId Optional client id. If the client id is not 
specified, a new one will be generated. This value (whether specified or 
generated) is included in the response.
    +     * @return A controllerEntity.
    +     */
    +    @GET
    +    @Path("/peers")
    +    @Consumes(MediaType.WILDCARD)
    +    @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
    +    // TODO: @PreAuthorize("hasRole('ROLE_NIFI')")
    +    @ApiOperation(
    +            value = "Returns the details about this NiFi necessary to 
communicate via site to site",
    +            response = PeersEntity.class,
    +            authorizations = @Authorization(value = "NiFi", type = 
"ROLE_NIFI")
    +    )
    +    @ApiResponses(
    +            value = {
    +                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
    +                    @ApiResponse(code = 401, message = "Client could not 
be authenticated."),
    +                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
    +                    @ApiResponse(code = 409, message = "The request was 
valid but NiFi was not in the appropriate state to process it. Retrying the 
same request later may be successful.")
    +            }
    +    )
    +    public Response getPeers(
    +            @ApiParam(
    +            value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
    +            required = false
    +    )
    +    @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
    +    @Context HttpServletRequest req) {
    +
    +        if (!properties.isSiteToSiteHttpEnabled()) {
    +            return responseCreator.httpSiteToSiteIsNotEnabledResponse();
    +        }
    +
    +        final Integer transportProtocolVersion;
    +        try {
    +            transportProtocolVersion = 
negotiateTransportProtocolVersion(req);
    +        } catch (BadRequestException e) {
    +            return responseCreator.badRequestResponse(e);
    +        }
    +
    +        ArrayList<PeerDTO> peers;
    +
    +        if (properties.isNode()) {
    +            return responseCreator.nodeTypeErrorResponse(req.getPathInfo() 
+ " is only accessible on NCM or Standalone NiFi instance.");
    +        // TODO: NCM no longer exists.
    +        /*
    +        } else if (properties.isClusterManager()) {
    +            ClusterNodeInformation clusterNodeInfo = 
clusterManager.getNodeInformation();
    +            final Collection<NodeInformation> nodeInfos = 
clusterNodeInfo.getNodeInformation();
    +            peers = new ArrayList<>(nodeInfos.size());
    +            for (NodeInformation nodeInfo : nodeInfos) {
    +                if (nodeInfo.getSiteToSiteHttpApiPort() == null) {
    +                    continue;
    +                }
    +                PeerDTO peer = new PeerDTO();
    +                peer.setHostname(nodeInfo.getSiteToSiteHostname());
    +                peer.setPort(nodeInfo.getSiteToSiteHttpApiPort());
    +                peer.setSecure(nodeInfo.isSiteToSiteSecure());
    +                peer.setFlowFileCount(nodeInfo.getTotalFlowFiles());
    +                peers.add(peer);
    +            }
    +        */
    +        } else {
    +            // Standalone mode.
    +            PeerDTO peer = new PeerDTO();
    +            // req.getLocalName returns private IP address, that can't be 
accessed from client in some environments.
    +            // So, use the value defined in nifi.properties instead when 
it is defined.
    +            String remoteInputHost = properties.getRemoteInputHost();
    +            peer.setHostname(isEmpty(remoteInputHost) ? req.getLocalName() 
: remoteInputHost);
    +            peer.setPort(properties.getRemoteInputHttpPort());
    +            peer.setSecure(properties.isSiteToSiteSecure());
    +            peer.setFlowFileCount(0);  // doesn't matter how many 
FlowFiles we have, because we're the only host.
    +
    +            peers = new ArrayList<>(1);
    +            peers.add(peer);
    +
    +        }
    +
    +        PeersEntity entity = new PeersEntity();
    +        entity.setPeers(peers);
    +
    +        return 
clusterContext(noCache(setCommonHeaders(Response.ok(entity), 
transportProtocolVersion))).build();
    +    }
    +
    +    @POST
    +    @Produces(MediaType.APPLICATION_JSON)
    +    @Path("{portType}/{portId}/transactions")
    +    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
    +    @ApiOperation(
    +            value = "Create a transaction to this output port",
    +            response = TransactionResultEntity.class,
    +            authorizations = {
    +                    @Authorization(value = "Read Only", type = 
"ROLE_MONITOR"),
    +                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
    +                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
    +            }
    +    )
    +    @ApiResponses(
    +            value = {
    +                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
    +                    @ApiResponse(code = 401, message = "Client could not 
be authenticated."),
    +                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
    +                    @ApiResponse(code = 404, message = "The specified 
resource could not be found."),
    +                    @ApiResponse(code = 409, message = "The request was 
valid but NiFi was not in the appropriate state to process it. Retrying the 
same request later may be successful.")
    +            }
    +    )
    +    public Response createPortTransaction(
    +            @ApiParam(
    +                    value = "If the client id is not specified, new one 
will be generated. This value (whether specified or generated) is included in 
the response.",
    +                    required = false
    +            )
    +            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
    +            @ApiParam(
    +                    value = "The input port id.",
    +                    required = true
    +            )
    +            @PathParam("portType") String portType,
    +            @PathParam("portId") String portId,
    +            @Context HttpServletRequest req,
    +            @Context ServletContext context,
    +            @Context UriInfo uriInfo,
    +            InputStream inputStream) {
    +
    +        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId);
    +        if (validationResult.errResponse != null) {
    +            return validationResult.errResponse;
    +        }
    +
    +        if(!PORT_TYPE_INPUT.equals(portType) && 
!PORT_TYPE_OUTPUT.equals(portType)){
    +            return responseCreator.wrongPortTypeResponse(clientId, 
portType, portId);
    +        }
    +
    +        logger.debug("createPortTransaction request: clientId={}, 
portType={}, portId={}", clientId.getClientId(), portType, portId);
    +
    +        final ByteArrayOutputStream out = new ByteArrayOutputStream();
    +        final String transactionId = 
transactionManager.createTransaction();
    +        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
    +        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
    +
    +        try {
    +            // Execute handshake.
    +            initiateServerProtocol(peer, transportProtocolVersion);
    +
    +            TransactionResultEntity entity = new TransactionResultEntity();
    +            entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
    +            entity.setMessage("Handshake properties are valid, and port is 
running. A transaction is created:" + transactionId);
    +
    +            return responseCreator.locationResponse(uriInfo, portType, 
portId, transactionId, entity, transportProtocolVersion);
    +
    +        } catch (HandshakeException e) {
    +            transactionManager.cancelTransaction(transactionId);
    +            return responseCreator.handshakeExceptionResponse(e);
    +
    +        } catch (Exception e) {
    +            transactionManager.cancelTransaction(transactionId);
    +            return responseCreator.unexpectedErrorResponse(clientId, 
portId, e);
    +        }
    +    }
    +
    +    @POST
    +    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
    +    @Produces(MediaType.TEXT_PLAIN)
    +    @Path("input-ports/{portId}/transactions/{transactionId}/flow-files")
    +    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 
'ROLE_ADMIN')")
    +    @ApiOperation(
    +            value = "Transfer flow files to input port",
    +            response = TransactionResultEntity.class,
    +            authorizations = {
    +                    @Authorization(value = "Read Only", type = 
"ROLE_MONITOR"),
    +                    @Authorization(value = "Data Flow Manager", type = 
"ROLE_DFM"),
    +                    @Authorization(value = "Administrator", type = 
"ROLE_ADMIN")
    +            }
    +    )
    +    @ApiResponses(
    +            value = {
    +                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
    +                    @ApiResponse(code = 401, message = "Client could not 
be authenticated."),
    +                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
    +                    @ApiResponse(code = 404, message = "The specified 
resource could not be found."),
    +                    @ApiResponse(code = 409, message = "The request was 
valid but NiFi was not in the appropriate state to process it. Retrying the 
same request later may be successful.")
    +            }
    +    )
    +    public Response receiveFlowFiles(
    +            @ApiParam(
    +                    value = "If the client id is not specified, new one 
will be generated. This value (whether specified or generated) is included in 
the response.",
    +                    required = false
    +            )
    +            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
    +            @ApiParam(
    +                    value = "The input port id.",
    +                    required = true
    +            )
    +            @PathParam("portId") String portId,
    +            @PathParam("transactionId") String transactionId,
    +            @Context HttpServletRequest req,
    +            @Context ServletContext context,
    +            @Context UriInfo uriInfo,
    +            InputStream inputStream) {
    +
    +        final ValidateRequestResult validationResult = validateResult(req, 
clientId, portId, transactionId);
    +        if (validationResult.errResponse != null) {
    +            return validationResult.errResponse;
    +        }
    +
    +        logger.debug("receiveFlowFiles request: clientId={}, portId={}", 
clientId.getClientId(), portId);
    +
    +        final ByteArrayOutputStream out = new ByteArrayOutputStream();
    +        final Peer peer = constructPeer(req, inputStream, out, portId, 
transactionId);
    +        final int transportProtocolVersion = 
validationResult.transportProtocolVersion;
    +
    +        try {
    +            HttpFlowFileServerProtocol serverProtocol = 
initiateServerProtocol(peer, transportProtocolVersion);
    +            int numOfFlowFiles = 
serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol);
    +            logger.debug("finished receiving flow files, 
numOfFlowFiles={}", numOfFlowFiles);
    +            if (numOfFlowFiles < 1) {
    +                return Response.status(Response.Status.BAD_REQUEST)
    +                        .entity("Client should send request when there is 
data to send. There was no flow file sent.").build();
    +            }
    +        } catch (HandshakeException e) {
    +            return responseCreator.handshakeExceptionResponse(e);
    +
    +        } catch (NotAuthorizedException e) {
    +            return responseCreator.unauthorizedResponse(e);
    +
    +        } catch (BadRequestException | RequestExpiredException e) {
    +            return responseCreator.badRequestResponse(e);
    +
    +        } catch (Exception e) {
    +            return responseCreator.unexpectedErrorResponse(clientId, 
portId, e);
    +        }
    +
    +        String serverChecksum = 
((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getChecksum();
    +        return responseCreator.acceptedResponse(serverChecksum, 
transportProtocolVersion);
    +    }
    +
    +    private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, 
Integer transportProtocolVersion) throws IOException {
    +        // Switch transaction protocol version based on transport protocol 
version.
    +        TransportProtocolVersionNegotiator 
negotiatedTransportProtocolVersion = new 
TransportProtocolVersionNegotiator(transportProtocolVersion);
    +        VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion());
    +        HttpFlowFileServerProtocol serverProtocol = new 
HttpFlowFileServerProtocol(versionNegotiator);
    +        
HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
    +        // TODO: How should I pass cluster information?
    +        // serverProtocol.setNodeInformant(clusterManager);
    +        serverProtocol.handshake(peer);
    +        return serverProtocol;
    +    }
    +
    +    private Peer constructPeer(HttpServletRequest req, InputStream 
inputStream, OutputStream outputStream, String portId, String transactionId) {
    +        String clientHostName = req.getRemoteHost();
    +        int clientPort = req.getRemotePort();
    +
    +        PeerDescription peerDescription = new 
PeerDescription(clientHostName, clientPort, req.isSecure());
    +
    +        HttpServerCommunicationsSession commSession = new 
HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
    +
    +        boolean useCompression = false;
    +        final String useCompressionStr = 
req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
    +        if (!isEmpty(useCompressionStr) && 
Boolean.valueOf(useCompressionStr)) {
    +            useCompression = true;
    +        }
    +
    +        final String requestExpiration = 
req.getHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION);
    +        final String batchCount = 
req.getHeader(HANDSHAKE_PROPERTY_BATCH_COUNT);
    +        final String batchSize = 
req.getHeader(HANDSHAKE_PROPERTY_BATCH_SIZE);
    +        final String batchDuration = 
req.getHeader(HANDSHAKE_PROPERTY_BATCH_DURATION);
    +
    +        commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, 
portId);
    +        commSession.putHandshakeParam(HandshakeProperty.GZIP, 
String.valueOf(useCompression));
    +
    +        if (!isEmpty(requestExpiration)) 
commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
    +        if (!isEmpty(batchCount)) 
commSession.putHandshakeParam(BATCH_COUNT, batchCount);
    +        if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, 
batchSize);
    +        if (!isEmpty(batchDuration)) 
commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
    +
    +        if(peerDescription.isSecure()){
    +            NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
    +            logger.debug("initiating peer, nifiUser={}", nifiUser);
    +            commSession.setUserDn(nifiUser.getIdentity());
    +        }
    +
    +        // TODO: Followed how SocketRemoteSiteListener define peerUrl and 
clusterUrl, but it can be more meaningful values, especially for clusterUrl.
    +        String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
    +        String clusterUrl = "nifi://localhost:" + req.getLocalPort();
    --- End diff --
    
    What can be a meaningful clusterUrl here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to