mattyb149 commented on a change in pull request #5755:
URL: https://github.com/apache/nifi/pull/5755#discussion_r805913393



##########
File path: 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
##########
@@ -147,6 +180,140 @@ protected ConfigurationProviderInfo 
initContentTypeInfo(List<ConfigurationProvid
         return new ConfigurationProviderInfo(mediaTypeList, contentTypes, 
null);
     }
 
+    @POST
+    @Path("/heartbeat")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send a heartbeat to the 
C2 server",
+            response = C2HeartbeatResponse.class
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response heartbeat(
+            @Context HttpServletRequest request, @Context HttpHeaders 
httpHeaders, @Context UriInfo uriInfo,
+            @ApiParam(required = true) final C2Heartbeat heartbeat) {
+
+        logRequestEntry(heartbeat, heartbeat.getAgentId(), 
heartbeat.getIdentifier());
+
+        try {
+            
authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), 
uriInfo);
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        }
+
+        List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
+        boolean defaultAccept = false;
+        if (acceptValues.size() == 0) {
+            acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
+            defaultAccept = true;
+        }
+        if (logger.isDebugEnabled()) {
+            StringBuilder builder = new StringBuilder("Handling request from ")
+                    .append(HttpRequestUtil.getClientString(request))
+                    .append(" with Accept");
+            if (defaultAccept) {
+                builder.append(" default value");
+            }
+            builder.append(": ")
+                    
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+            logger.debug(builder.toString());
+        }
+
+        try {
+            Map<String, List<String>> parameters = 
Collections.singletonMap("class", 
Collections.singletonList(heartbeat.getAgentClass()));
+            ConfigurationProviderValue configurationProviderValue = 
configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
+            org.apache.nifi.minifi.c2.api.Configuration configuration = 
configurationProviderValue.getConfiguration();
+            final C2ProtocolContext heartbeatContext = 
C2ProtocolContext.builder()
+                    .baseUri(configuration.getURL().toURI())
+                    
.contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                    .build();
+
+            Response response;
+
+            try {
+                final C2HeartbeatResponse heartbeatResponse = 
c2ProtocolService.processHeartbeat(heartbeat, heartbeatContext);
+                response = Response.ok(heartbeatResponse).build();
+            } catch (Exception e) {
+                logger.error("Heartbeat processing failed", e);
+                response = 
Response.status(BAD_REQUEST).entity(e.getMessage()).build();
+            }
+            logRequestProcessingFinished(heartbeat.getAgentId(), 
heartbeat.getIdentifier());
+            return response;
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        } catch (InvalidParameterException e) {
+            logger.info(HttpRequestUtil.getClientString(request) + " made 
invalid request with " + HttpRequestUtil.getQueryString(request), e);
+            return Response.status(400).entity("Invalid request.").build();
+        } catch (ConfigurationProviderException | URISyntaxException e) {
+            logger.warn("Unable to get configuration.", e);
+            return Response.status(500).build();
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException) {
+                throw (WebApplicationException) cause;
+            }
+            logger.error(HttpRequestUtil.getClientString(request) + " made 
request with " + HttpRequestUtil.getQueryString(request) + " that caused 
error.", cause);
+            return Response.status(500).entity("Internal error").build();
+        }
+    }
+
+    @POST
+    @Path("/acknowledge")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send an operation 
acknowledgement to the C2 server"
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response acknowledge(
+            @ApiParam(required = true) final C2OperationAck operationAck) {
+
+        Optional<String> agentId = getAgentId(operationAck);
+
+        agentId.ifPresent(id -> logRequestEntry(operationAck, id, 
operationAck.getOperationId()));
+
+        final C2ProtocolContext ackContext = C2ProtocolContext.builder()
+                .baseUri(getBaseUri())
+                .contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                .build();
+
+        c2ProtocolService.processOperationAck(operationAck, ackContext);
+
+        agentId.ifPresent(id -> logRequestProcessingFinished(id, 
operationAck.getOperationId()));
+
+        return Response.ok().build();
+
+    }
+
+    private Optional<String> getAgentId(C2OperationAck operationAck) {
+        Optional<String> agentId;
+        if (operationAck.getAgentInfo() != null) {
+            agentId = 
Optional.ofNullable(operationAck.getAgentInfo().getIdentifier());
+        } else {
+            agentId = Optional.empty();
+        }
+
+        return agentId;
+    }
+
+    private void logRequestEntry(Serializable request, String agentId, String 
requestId) {
+        Marker marker = getMarker(agentId);
+        logger.debug(marker, "Incoming request from agent [{}] with request id 
[{}]", agentId, requestId);

Review comment:
       Good point, will remove.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to