bejancsaba commented on code in PR #5755:
URL: https://github.com/apache/nifi/pull/5755#discussion_r857111580


##########
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.service;
+
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationState;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+@Service
+public class SimpleC2ProtocolService implements C2ProtocolService {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SimpleC2ProtocolService.class);
+
+    private static final Set<String> issuedOperationIds = new HashSet<>();
+
+    private final Map<String, String> currentFlowIds;
+
+    public SimpleC2ProtocolService() {
+        currentFlowIds = new HashMap<>(1000);
+    }
+
+    @Override
+    public void processOperationAck(final C2OperationAck operationAck, final 
C2ProtocolContext context) {
+        // This service assumes there is a single Operation UPDATE to pass 
over the updated flow
+        logger.debug("Received operation acknowledgement: {}; {}", 
operationAck, context);
+        // Remove the operator ID from the list of issued operations and log 
the state
+        final String operationId = operationAck.getOperationId();
+        try {
+            OperationState opState = OperationState.DONE;
+            String details = null;
+
+            /* Partial applications are rare and only happen when an operation 
consists of updating multiple config
+             * items and some succeed ( we don't yet have the concept of 
rollback in agents ).
+             * Fully Applied yields an operation success.
+             * Operation Not Understood and Not Applied give little details 
but also will result in Operation Failure.
+             * We should explore if providing textual details. */
+            final C2OperationState c2OperationState = 
operationAck.getOperationState();
+            if (null != c2OperationState) {
+                details = c2OperationState.getDetails();
+                if (c2OperationState.getState() != 
C2OperationState.OperationState.FULLY_APPLIED) {
+                    opState = OperationState.FAILED;
+                }
+            }
+
+            if (!issuedOperationIds.remove(operationId)) {
+                logger.warn("Operation with ID " + operationId + " has either 
already been acknowledged or is unknown to this server");
+            } else if (null != c2OperationState) {
+                final C2OperationState.OperationState operationState = 
c2OperationState.getState();
+                logger.debug("Operation with ID " + operationId + " 
acknowledged with a state of " + operationState.name() + "(" + opState.name() + 
"), details = "
+                        + (details == null ? "" : details));
+            }
+
+            // Optionally, an acknowledgement can include some of the info 
normally passed in a heartbeat.
+            // If this info is present, process it as a heartbeat, so we 
update our latest known state of the agent.
+            if (operationAck.getAgentInfo() != null
+                    || operationAck.getDeviceInfo() != null
+                    || operationAck.getFlowInfo() != null) {
+                final C2Heartbeat heartbeatInfo = toHeartbeat(operationAck);
+                logger.trace("Operation acknowledgement contains additional 
info. Processing as heartbeat: {}", heartbeatInfo);
+                processHeartbeat(heartbeatInfo, context);
+            }
+
+        } catch (final Exception e) {
+            logger.warn("Encountered exception while processing operation 
ack", e);
+        }
+    }
+
+    @Override
+    public C2HeartbeatResponse processHeartbeat(final C2Heartbeat heartbeat, 
final C2ProtocolContext context) {
+
+        C2HeartbeatResponse c2HeartbeatResponse = new C2HeartbeatResponse();
+        String currentFlowId = currentFlowIds.get(heartbeat.getAgentId());

Review Comment:
   I was thinking about this maybe I'm missing something but flowId is 
generated by the server and passed to the agent via the operation (as part of 
the URI in the location), right? When the agent is heartbeating in the 
Heartbeat the FlowInfo contains the flowID.  If this is actually the case then 
we don't need to track the agent to flowId mapping on server side or this is 
not the approach we are working with?



##########
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java:
##########
@@ -147,6 +175,123 @@ protected ConfigurationProviderInfo 
initContentTypeInfo(List<ConfigurationProvid
         return new ConfigurationProviderInfo(mediaTypeList, contentTypes, 
null);
     }
 
+    @POST
+    @Path("/heartbeat")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send a heartbeat to the 
C2 server",
+            response = C2HeartbeatResponse.class
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response heartbeat(
+            @Context HttpServletRequest request, @Context HttpHeaders 
httpHeaders, @Context UriInfo uriInfo,
+            @ApiParam(required = true) final C2Heartbeat heartbeat) {
+
+        try {
+            
authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), 
uriInfo);
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        }
+
+        List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
+        boolean defaultAccept = false;
+        if (acceptValues.size() == 0) {
+            acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
+            defaultAccept = true;
+        }
+        if (logger.isDebugEnabled()) {
+            StringBuilder builder = new StringBuilder("Handling request from ")
+                    .append(HttpRequestUtil.getClientString(request))
+                    .append(" with Accept");
+            if (defaultAccept) {
+                builder.append(" default value");
+            }
+            builder.append(": ")
+                    
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+            logger.debug(builder.toString());
+        }
+
+        try {
+            final String flowId;
+            Map<String, List<String>> parameters = 
Collections.singletonMap("class", 
Collections.singletonList(heartbeat.getAgentClass()));
+            ConfigurationProviderValue configurationProviderValue = 
configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
+            org.apache.nifi.minifi.c2.api.Configuration configuration = 
configurationProviderValue.getConfiguration();
+            try (InputStream inputStream = configuration.getInputStream();

Review Comment:
   I'm not sure whether I'm reading this right or not but if there is a 
heartbeat with an unrecognised agent class (there  is no flow definition for it 
on the server side) shouldn't we handle that more gracefully? I mean with 
something like 4XX response or at least without exception? Maybe @kevdoran has 
better insight here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to