bejancsaba commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1061405522
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.operationService = operationService;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ if (heartbeatLocked) {
+ logger.debug("Restart is in progress, skipping heartbeat");
Review Comment:
Maybe we could be less specific we don't know if this is restart. We could
say "heartbeats are locked skipping sending for now" or something along these
lines. What do you think?
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response)
{
}
}
- private void handleRequestedOperations(List<C2Operation>
requestedOperations) {
- for (C2Operation requestedOperation : requestedOperations) {
- operationService.handleOperation(requestedOperation)
- .ifPresent(client::acknowledgeOperation);
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart()
+ && !Optional.ofNullable(c2OperationAck)
Review Comment:
This could go te a dedicated function either here or on the C2OperationAck
class, also you could lose the negation if you call isEmpty in the end.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.operationService = operationService;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ if (heartbeatLocked) {
+ logger.debug("Restart is in progress, skipping heartbeat");
+ } else {
+ C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
+
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ }
} catch (Exception e) {
logger.error("Failed to send/process heartbeat:", e);
}
}
+ public void sendAcknowledge(C2OperationAck operationAck) {
+ try {
+ client.acknowledgeOperation(operationAck);
+ } catch (Exception e) {
+ logger.error("Failed to send acknowledge:", e);
+ }
+ }
+
+ public void enableHeartbeat() {
+ heartbeatLocked = false;
+ }
+
+ public void handleRequestedOperations(List<C2Operation>
requestedOperations) {
+ LinkedList<C2Operation> lRequestedOperations = new
LinkedList<>(requestedOperations);
+ C2Operation requestedOperation;
+ while ((requestedOperation = lRequestedOperations.poll()) != null) {
+ Optional<C2OperationHandler> c2OperationHandler =
operationService.getHandlerForOperation(requestedOperation);
+ if (c2OperationHandler.isPresent()) {
+ Optional<C2OperationAck> c2OperationAck =
handleOperation(c2OperationHandler.get(), requestedOperation,
lRequestedOperations);
+ if (c2OperationAck.isPresent()) {
+ sendAcknowledge(c2OperationAck.get());
+ } else {
+ return;
Review Comment:
Can you add a little comment here or extract this limited logic to a well
named function? Simply reading the code is not evident why we return in this
case.
##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java:
##########
@@ -128,16 +148,110 @@ void testSendHeartbeatNotAckWhenOperationAckMissing() {
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
hbResponse.setRequestedOperations(generateOperation(1));
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-
when(operationService.handleOperation(any())).thenReturn(Optional.empty());
+
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
verify(c2HeartbeatFactory).create(any());
verify(client).publishHeartbeat(heartbeat);
- verify(operationService).handleOperation(any());
verify(client, times(0)).acknowledgeOperation(any());
}
+ @Test
+ void shouldHeartbeatSendingNotPropagateExceptions() {
+ when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new
RuntimeException());
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+ }
+
+ @Test
+ void shouldAckSendingNotPropagateExceptions() {
+ C2OperationAck operationAck = mock(C2OperationAck.class);
+ doThrow(new
RuntimeException()).when(client).acknowledgeOperation(operationAck);
+
+ c2ClientService.sendAcknowledge(operationAck);
+ }
+
+ @Test
+ void
shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState()
{
Review Comment:
It is a long name (or a short book :)
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties
bootstrapProperties, String confDir, Proc
return process;
}
+ private boolean revertFlowConfig(Properties bootstrapProperties, String
confDir, File swapConfigFile) throws IOException {
+ DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change
configuration. Reverting to old configuration.");
+
+ try {
+ ByteBuffer tempConfigFile =
generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir,
bootstrapProperties);
+
runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+ } catch (ConfigurationChangeException e) {
+ DEFAULT_LOGGER.error("The swap file is malformed, unable to
restart from prior state. Will not attempt to restart MiNiFi. Swap File should
be cleaned up manually.");
+ return false;
+ }
+
+ Files.copy(swapConfigFile.toPath(),
Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)),
REPLACE_EXISTING);
+
+ DEFAULT_LOGGER.info("Replacing config file with swap file and deleting
swap file");
+ if (!swapConfigFile.delete()) {
+ DEFAULT_LOGGER.warn("The swap file failed to delete after
replacing using it to revert to the old configuration. It should be cleaned up
manually.");
+ }
+ runMiNiFi.setReloading(false);
+ return true;
+ }
+
+ private boolean revertBootstrapConfig(String confDir, File
bootstrapSwapConfigFile) throws IOException {
+ DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying
to change configuration. Reverting to old configuration.");
+
+ Files.copy(bootstrapSwapConfigFile.toPath(),
bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+ try {
+ ByteBuffer tempConfigFile =
generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()),
confDir, bootstrapFileProvider.getBootstrapProperties());
+
runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+ } catch (ConfigurationChangeException e) {
+ DEFAULT_LOGGER.error("The swap file is malformed, unable to
restart from prior state. Will not attempt to restart MiNiFi. Swap File should
be cleaned up manually.");
+ return false;
+ }
+
+ if (!bootstrapSwapConfigFile.delete()) {
+ DEFAULT_LOGGER.warn("The swap file failed to delete after
replacing using it to revert to the old configuration. It should be cleaned up
manually.");
Review Comment:
Don't we need to say which swap file? Maybe Logging the path as well would
be helpful. All of this feels like duplication but I understand why you didn't
abstract but half f this (and the previous) function should be easily extracted
right?
The delete swap file function from below can't be applied hetre?
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java:
##########
@@ -63,45 +67,63 @@ public void communicate() throws IOException {
}
try {
- processRequest(cmd, args);
+ processRequest(cmd, args, writer);
} catch (InvalidCommandException exception) {
throw new IOException("Received invalid command from MiNiFi: " +
line, exception);
}
}
- private void processRequest(String cmd, String[] args) throws
InvalidCommandException, IOException {
+ private void processRequest(String cmd, String[] args, BufferedWriter
writer) throws InvalidCommandException, IOException {
switch (cmd) {
case "PORT":
- handlePortCommand(args);
+ handlePortCommand(args, writer);
break;
case "STARTED":
- handleStartedCommand(args);
+ handleStartedCommand(args, writer);
break;
case "SHUTDOWN":
- handleShutDownCommand();
+ handleShutDownCommand(writer);
break;
case "RELOAD":
- handleReloadCommand();
+ handleReloadCommand(writer);
+ break;
+ case "UPDATE_PROPERTIES":
+ handlePropertiesUpdateCommand(args, writer);
+ break;
+ case "UPDATE_CONFIGURATION":
+ handleUpdateConfigurationCommand(args, writer);
break;
default:
throw new InvalidCommandException("Unknown command: " + cmd);
}
}
- private void handleReloadCommand() throws IOException {
+ private void handleUpdateConfigurationCommand(String[] args,
BufferedWriter writer) throws IOException {
Review Comment:
args is not used here and in the other function right?
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java:
##########
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
*/
Map<String, Object> getProperties();
+ /**
+ * Determines if the given operation requires to restart the MiNiFi process
+ * @return true if it requires restart, false otherwise
+ */
+ default boolean requiresRestart() {
Review Comment:
Haven't really thought this through just a quick question. Doesn't this
depend somewhat on the client implementation? With the current one it makes
sense of course.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response)
{
}
}
- private void handleRequestedOperations(List<C2Operation>
requestedOperations) {
- for (C2Operation requestedOperation : requestedOperations) {
- operationService.handleOperation(requestedOperation)
- .ifPresent(client::acknowledgeOperation);
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart()
+ && !Optional.ofNullable(c2OperationAck)
+ .map(C2OperationAck::getOperationState)
+ .map(C2OperationState::getState)
+ .filter(state -> !FULLY_APPLIED.equals(state))
+ .isPresent();
+ }
+
+ private Optional<C2OperationAck> handleOperation(C2OperationHandler
c2OperationHandler, C2Operation requestedOperation, List<C2Operation>
requestedOperations) {
+ C2OperationAck c2OperationAck =
c2OperationHandler.handle(requestedOperation);
+ if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+ handleRestartableOperation(requestedOperations,
requestedOperation);
+ return Optional.empty();
+ } else {
+ return Optional.of(c2OperationAck);
+ }
+ }
+
+ private void handleRestartableOperation(List<C2Operation>
remainingOperations, C2Operation requestedOperation) {
+ // need to stop heartbeats till the restart happens
+ heartbeatLocked = true;
+
+ try {
+ requestedOperationDAO.save(new OperationQueue(requestedOperation,
remainingOperations));
+ c2OperationRegister.accept(requestedOperation);
+ } catch (Exception e) {
+ if (remainingOperations.isEmpty()) {
+ enableHeartbeat();
+ } else {
+ // reset saved queue and continue with remaining operations
+ requestedOperationDAO.reset();
Review Comment:
maybe cleanUp would be a more descriptive name instead of reset?
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.operationService = operationService;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ if (heartbeatLocked) {
+ logger.debug("Restart is in progress, skipping heartbeat");
+ } else {
+ C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
+
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ }
} catch (Exception e) {
logger.error("Failed to send/process heartbeat:", e);
}
}
+ public void sendAcknowledge(C2OperationAck operationAck) {
+ try {
+ client.acknowledgeOperation(operationAck);
+ } catch (Exception e) {
+ logger.error("Failed to send acknowledge:", e);
+ }
+ }
+
+ public void enableHeartbeat() {
+ heartbeatLocked = false;
+ }
+
+ public void handleRequestedOperations(List<C2Operation>
requestedOperations) {
+ LinkedList<C2Operation> lRequestedOperations = new
LinkedList<>(requestedOperations);
+ C2Operation requestedOperation;
+ while ((requestedOperation = lRequestedOperations.poll()) != null) {
Review Comment:
Wouldn't it be more "natural" to implement this with a PriorityQueue? It
would help readability and would clean slightly this function as well (I think).
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response)
{
}
}
- private void handleRequestedOperations(List<C2Operation>
requestedOperations) {
- for (C2Operation requestedOperation : requestedOperations) {
- operationService.handleOperation(requestedOperation)
- .ifPresent(client::acknowledgeOperation);
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart()
+ && !Optional.ofNullable(c2OperationAck)
+ .map(C2OperationAck::getOperationState)
+ .map(C2OperationState::getState)
+ .filter(state -> !FULLY_APPLIED.equals(state))
+ .isPresent();
+ }
+
+ private Optional<C2OperationAck> handleOperation(C2OperationHandler
c2OperationHandler, C2Operation requestedOperation, List<C2Operation>
requestedOperations) {
+ C2OperationAck c2OperationAck =
c2OperationHandler.handle(requestedOperation);
+ if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+ handleRestartableOperation(requestedOperations,
requestedOperation);
+ return Optional.empty();
+ } else {
+ return Optional.of(c2OperationAck);
+ }
+ }
+
+ private void handleRestartableOperation(List<C2Operation>
remainingOperations, C2Operation requestedOperation) {
+ // need to stop heartbeats till the restart happens
+ heartbeatLocked = true;
Review Comment:
We have enableHeartbeat function can we have a disableHeartbeat as well?
With that I don't think you need the comment here with function name it is
quite self explanatory.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.operationService = operationService;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ if (heartbeatLocked) {
+ logger.debug("Restart is in progress, skipping heartbeat");
+ } else {
+ C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
+
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ }
} catch (Exception e) {
logger.error("Failed to send/process heartbeat:", e);
}
}
+ public void sendAcknowledge(C2OperationAck operationAck) {
+ try {
+ client.acknowledgeOperation(operationAck);
+ } catch (Exception e) {
+ logger.error("Failed to send acknowledge:", e);
+ }
+ }
+
+ public void enableHeartbeat() {
+ heartbeatLocked = false;
+ }
+
+ public void handleRequestedOperations(List<C2Operation>
requestedOperations) {
+ LinkedList<C2Operation> lRequestedOperations = new
LinkedList<>(requestedOperations);
+ C2Operation requestedOperation;
+ while ((requestedOperation = lRequestedOperations.poll()) != null) {
+ Optional<C2OperationHandler> c2OperationHandler =
operationService.getHandlerForOperation(requestedOperation);
Review Comment:
You could do the warning logging here on the service side in the get
function I think it belongs there, that way you could lose the isPresent here
and make the whole thing more fluent. What do you think?
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.Optional;
+
+public interface RequestedOperationDAO {
Review Comment:
Maybe a class level comment / doc could be added to outline the purpose of
the interface.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java:
##########
@@ -36,6 +36,8 @@ public class C2NiFiProperties {
public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX +
"rest.connectionTimeout";
public static final String C2_READ_TIMEOUT = C2_PREFIX +
"rest.readTimeout";
public static final String C2_CALL_TIMEOUT = C2_PREFIX +
"rest.callTimeout";
+ public static final String C2_MAX_IDLE_CONNECTIONS = C2_PREFIX +
"rest.maxIdleConnections";
Review Comment:
This all feels somewhat duplicated can we utilise MiNiFiProperties here?
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java:
##########
@@ -22,32 +22,19 @@
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class C2OperationService {
-
- private static final Logger logger =
LoggerFactory.getLogger(C2OperationService.class);
+public class C2OperationHandlerProvider {
private final Map<OperationType, Map<OperandType, C2OperationHandler>>
handlerMap = new HashMap<>();
- public C2OperationService(List<C2OperationHandler> handlers) {
+ public C2OperationHandlerProvider(List<C2OperationHandler> handlers) {
for (C2OperationHandler handler : handlers) {
handlerMap.computeIfAbsent(handler.getOperationType(), x -> new
HashMap<>()).put(handler.getOperandType(), handler);
}
}
- public Optional<C2OperationAck> handleOperation(C2Operation operation) {
Review Comment:
Thanks, the class looks cleaner and serves a concrete purpose this way.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.operationService = operationService;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ if (heartbeatLocked) {
+ logger.debug("Restart is in progress, skipping heartbeat");
+ } else {
+ C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
+
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ }
} catch (Exception e) {
logger.error("Failed to send/process heartbeat:", e);
}
}
+ public void sendAcknowledge(C2OperationAck operationAck) {
+ try {
+ client.acknowledgeOperation(operationAck);
+ } catch (Exception e) {
+ logger.error("Failed to send acknowledge:", e);
+ }
+ }
+
+ public void enableHeartbeat() {
+ heartbeatLocked = false;
+ }
+
+ public void handleRequestedOperations(List<C2Operation>
requestedOperations) {
+ LinkedList<C2Operation> lRequestedOperations = new
LinkedList<>(requestedOperations);
Review Comment:
I'm not sure whether the "l" prefix is intentional in the variable name (and
stands for local?) or not but I would vote against it :) You could rename the
parameter to something longer as it is used only once and simply use
requestedOperations inside the function.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static
org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+ @Mock
+ private ObjectMapper objectMapper;
+
+ @TempDir
+ File tmpDir;
+
+ private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+ @BeforeEach
+ void setup() {
+ fileBasedRequestedOperationDAO = new
FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+ }
+
+ @Test
+ void shouldSaveRequestedOperationsToFile() throws IOException {
+ OperationQueue operationQueue = getOperationQueue();
+ fileBasedRequestedOperationDAO.save(operationQueue);
+
+ verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+ }
+ @Test
+ void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws
IOException {
+ doThrow(new
RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+ assertThrows(RuntimeException.class, () ->
fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+ }
+
+ @Test
+ void shouldGetReturnEmptyWhenFileDoesntExists() {
+ assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+ }
+
+ @Test
+ void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+ new File(tmpDir.getAbsolutePath() + "/" +
REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+ doThrow(new
RuntimeException()).when(objectMapper).readValue(any(File.class),
eq(OperationQueue.class));
+
+ assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.get());
+ }
+
+ @Test
+ void shouldGetRequestedOperations() throws IOException {
+ new File(tmpDir.getAbsolutePath() + "/" +
REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+ OperationQueue operationQueue = getOperationQueue();
+ when(objectMapper.readValue(any(File.class),
eq(OperationQueue.class))).thenReturn(operationQueue);
+
+ assertEquals(Optional.of(operationQueue),
fileBasedRequestedOperationDAO.get());
+ }
+
Review Comment:
I know it is not fully unit test like but what about just calling write and
get on the dao and asserting the get result against what was passed to write?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_ENABLE;
+import static
org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class PropertiesPersisterTest {
Review Comment:
Nice!
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+
+public class OperationQueue implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private C2Operation currentOperation;
+ private List<C2Operation> remainingOperations;
Review Comment:
As commented elsewhere I would use an actual queue instead of list.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesOperationHandler implements C2OperationHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UpdatePropertiesOperationHandler.class);
+
+ private final OperandPropertiesProvider operandPropertiesProvider;
+ private final Function<Map<String, String>, Boolean> persistProperties;
+
+ public UpdatePropertiesOperationHandler(OperandPropertiesProvider
operandPropertiesProvider, Function<Map<String, String>, Boolean>
persistProperties) {
+ this.operandPropertiesProvider = operandPropertiesProvider;
+ this.persistProperties = persistProperties;
+ }
+
+ @Override
+ public OperationType getOperationType() {
+ return UPDATE;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return operandPropertiesProvider.getProperties();
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(operation.getIdentifier());
+ C2OperationState operationState = new C2OperationState();
+ c2OperationAck.setOperationState(operationState);
+ try {
+ if (persistProperties.apply(operation.getArgs())) {
+ operationState.setState(FULLY_APPLIED);
+ } else {
+ LOGGER.info("Properties are already in desired state");
+ operationState.setState(NO_OPERATION);
+ }
+ } catch (IllegalArgumentException e) {
+ LOGGER.error(e.getMessage());
+ operationState.setState(NOT_APPLIED);
+ operationState.setDetails(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Exception happened during persisting properties");
Review Comment:
Can we log the exception as well, this way it doesn't really help the
investigation.
##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+ protected static final String ID = "id";
Review Comment:
Why are these protected?
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -216,6 +213,46 @@ private Process restartNifi(Properties
bootstrapProperties, String confDir, Proc
return process;
}
+ private boolean revertFlowConfig(Properties bootstrapProperties, String
confDir, File swapConfigFile) throws IOException {
+ DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change
configuration. Reverting to old configuration.");
Review Comment:
Maybe we could also add here the extra info of Flow swap file exists
(similarly to the bootstrap message)
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/MiNiFiCommandState.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+public enum MiNiFiCommandState {
Review Comment:
Can you add class level doc explaining their purpose (just to be sure it is
not confused with the C2 Operation states)
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java:
##########
@@ -63,45 +67,63 @@ public void communicate() throws IOException {
}
try {
- processRequest(cmd, args);
+ processRequest(cmd, args, writer);
} catch (InvalidCommandException exception) {
throw new IOException("Received invalid command from MiNiFi: " +
line, exception);
}
}
- private void processRequest(String cmd, String[] args) throws
InvalidCommandException, IOException {
+ private void processRequest(String cmd, String[] args, BufferedWriter
writer) throws InvalidCommandException, IOException {
switch (cmd) {
case "PORT":
- handlePortCommand(args);
+ handlePortCommand(args, writer);
break;
case "STARTED":
- handleStartedCommand(args);
+ handleStartedCommand(args, writer);
break;
case "SHUTDOWN":
- handleShutDownCommand();
+ handleShutDownCommand(writer);
break;
case "RELOAD":
- handleReloadCommand();
+ handleReloadCommand(writer);
+ break;
+ case "UPDATE_PROPERTIES":
Review Comment:
What do you think about an Enum for these?
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import
org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import
org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+ private static final Logger logger =
LoggerFactory.getLogger(UpdateConfigurationService.class);
+ private static final String UPDATED_CONFIG_FILE_NAME =
"config-updated.yml";
+
+ private final Differentiator<ByteBuffer> differentiator;
+ private final RunMiNiFi runMiNiFi;
+ private final ConfigurationChangeListener
miNiFiConfigurationChangeListener;
+ private final BootstrapFileProvider bootstrapFileProvider;
+
+ public UpdateConfigurationService(RunMiNiFi runMiNiFi,
ConfigurationChangeListener miNiFiConfigurationChangeListener,
BootstrapFileProvider bootstrapFileProvider) {
+ this.differentiator =
WholeConfigDifferentiator.getByteBufferDifferentiator();
+ this.differentiator.initialize(runMiNiFi);
+ this.runMiNiFi = runMiNiFi;
+ this.miNiFiConfigurationChangeListener =
miNiFiConfigurationChangeListener;
+ this.bootstrapFileProvider = bootstrapFileProvider;
+ }
+
+ public Optional<MiNiFiCommandState> handleUpdate() {
+ logger.debug("Handling configuration update");
Review Comment:
I think it is quite rare and important could be logged on info level to show
what is happening.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import
org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import
org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+ private static final Logger logger =
LoggerFactory.getLogger(UpdateConfigurationService.class);
+ private static final String UPDATED_CONFIG_FILE_NAME =
"config-updated.yml";
+
+ private final Differentiator<ByteBuffer> differentiator;
+ private final RunMiNiFi runMiNiFi;
+ private final ConfigurationChangeListener
miNiFiConfigurationChangeListener;
+ private final BootstrapFileProvider bootstrapFileProvider;
+
+ public UpdateConfigurationService(RunMiNiFi runMiNiFi,
ConfigurationChangeListener miNiFiConfigurationChangeListener,
BootstrapFileProvider bootstrapFileProvider) {
+ this.differentiator =
WholeConfigDifferentiator.getByteBufferDifferentiator();
+ this.differentiator.initialize(runMiNiFi);
+ this.runMiNiFi = runMiNiFi;
+ this.miNiFiConfigurationChangeListener =
miNiFiConfigurationChangeListener;
+ this.bootstrapFileProvider = bootstrapFileProvider;
+ }
+
+ public Optional<MiNiFiCommandState> handleUpdate() {
+ logger.debug("Handling configuration update");
+ Optional<MiNiFiCommandState> commandState = Optional.empty();
Review Comment:
commandState doesn't need to be optional you can set it to null here and at
return you could do Optional.ofNullable.
##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java:
##########
@@ -172,13 +186,42 @@ void testCommunicateShouldHandleShutdownCommand() throws
IOException {
void testCommunicateShouldHandleReloadCommand() throws IOException {
InputStream inputStream = new
ByteArrayInputStream("RELOAD".getBytes(StandardCharsets.UTF_8));
- PeriodicStatusReporterManager periodicStatusReporterManager =
mock(PeriodicStatusReporterManager.class);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- BootstrapCodec bootstrapCodec = new BootstrapCodec(runner,
inputStream, outputStream);
-
when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
- bootstrapCodec.communicate();
+ bootstrapCodec.communicate(inputStream, outputStream);
+
+ assertEquals(OK, outputStream.toString().trim());
+ }
+
+ @Test
+ void shouldHandleUpdateConfigurationCommand() throws IOException {
Review Comment:
While I really like the "should" prefixed test naming for the sake of
consistency we should follow the convention of the already existing tests.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import
org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import
org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+ private static final Logger logger =
LoggerFactory.getLogger(UpdateConfigurationService.class);
+ private static final String UPDATED_CONFIG_FILE_NAME =
"config-updated.yml";
+
+ private final Differentiator<ByteBuffer> differentiator;
+ private final RunMiNiFi runMiNiFi;
+ private final ConfigurationChangeListener
miNiFiConfigurationChangeListener;
+ private final BootstrapFileProvider bootstrapFileProvider;
+
+ public UpdateConfigurationService(RunMiNiFi runMiNiFi,
ConfigurationChangeListener miNiFiConfigurationChangeListener,
BootstrapFileProvider bootstrapFileProvider) {
+ this.differentiator =
WholeConfigDifferentiator.getByteBufferDifferentiator();
+ this.differentiator.initialize(runMiNiFi);
+ this.runMiNiFi = runMiNiFi;
+ this.miNiFiConfigurationChangeListener =
miNiFiConfigurationChangeListener;
+ this.bootstrapFileProvider = bootstrapFileProvider;
+ }
+
+ public Optional<MiNiFiCommandState> handleUpdate() {
+ logger.debug("Handling configuration update");
+ Optional<MiNiFiCommandState> commandState = Optional.empty();
+ try (FileInputStream configFile = new
FileInputStream(getConfigFilePath().toFile())) {
+ ByteBuffer readOnlyNewConfig =
ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+ IOUtils.toByteArray(configFile),
runMiNiFi.getConfigFileReference().get().duplicate(),
bootstrapFileProvider.getBootstrapProperties());
+ if (differentiator.isNew(readOnlyNewConfig)) {
+ miNiFiConfigurationChangeListener.handleChange(new
ByteBufferInputStream(readOnlyNewConfig.duplicate()));
+ } else {
+ logger.info("The given configuration does not contain any
update. No operation required");
+ commandState = Optional.of(MiNiFiCommandState.NO_OPERATION);
+ }
+ } catch (Exception e) {
+ commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+ logger.error("Could not handle configuration update", e);
+ }
+ return commandState;
+ }
+
+ private Path getConfigFilePath() {
+ return ofNullable(safeGetPropertiesFilePath())
+ .map(File::new)
+ .map(File::getParent)
+ .map(parentDir -> new File(parentDir + UPDATED_CONFIG_FILE_NAME))
+ .orElse(new File("./conf/" + UPDATED_CONFIG_FILE_NAME)).toPath();
Review Comment:
Isn't /conf available as a constant somewhere so we wouldn't need to hard
code it this deeply.
##########
c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+ protected static final String ID = "id";
+ protected static final Map<String, String> ARGS =
Collections.singletonMap("key", "value");
+ @Mock
+ private OperandPropertiesProvider operandPropertiesProvider;
+
+ @Mock
+ private Function<Map<String, String>, Boolean> persistProperties;
+
+ @InjectMocks
+ private UpdatePropertiesOperationHandler updatePropertiesOperationHandler;
+
+ @Test
+ void shouldReturnStaticSettings() {
+ assertEquals(UPDATE,
updatePropertiesOperationHandler.getOperationType());
+ assertEquals(PROPERTIES,
updatePropertiesOperationHandler.getOperandType());
+ assertTrue(updatePropertiesOperationHandler.requiresRestart());
+ }
+
+ @Test
+ void shouldReturnProperties() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("test", new Object());
+ when(operandPropertiesProvider.getProperties()).thenReturn(properties);
+
+ Map<String, Object> result =
updatePropertiesOperationHandler.getProperties();
+
+ assertEquals(properties, result);
+ }
+
+ @Test
+ void shouldReturnAckWithFullyAppliedWhenPersistIsSuccessful() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenReturn(true);
+
+ C2OperationAck result =
updatePropertiesOperationHandler.handle(c2Operation);
+
+ assertEquals(getExpected(OperationState.FULLY_APPLIED), result);
+ }
+
+ @Test
+ void shouldReturnAckWithNoOperationWhenPersistReturnFalse() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenReturn(false);
+
+ C2OperationAck result =
updatePropertiesOperationHandler.handle(c2Operation);
+
+ assertEquals(getExpected(OperationState.NO_OPERATION), result);
+ }
+
+ @Test
+ void shouldReturnNotAppliedInCaseOfIllegalArgumentException() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenThrow(new
IllegalArgumentException());
+
+ C2OperationAck result =
updatePropertiesOperationHandler.handle(c2Operation);
+
+ assertEquals(getExpected(OperationState.NOT_APPLIED), result);
+ }
+
+ @Test
+ void shouldReturnNotAppliedInCaseOfException() {
+ C2Operation c2Operation = getC2Operation();
+ when(persistProperties.apply(ARGS)).thenThrow(new RuntimeException());
+
+ C2OperationAck result =
updatePropertiesOperationHandler.handle(c2Operation);
+
+ C2OperationAck expected = getExpected(OperationState.NOT_APPLIED);
+ expected.getOperationState().setDetails("Failed to persist
properties");
+ assertEquals(expected, result);
+ }
+ private C2OperationAck getExpected(OperationState operationState) {
Review Comment:
An empty line would be nice.
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.asByteArrayInputStream;
+import static
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.nifi.minifi.bootstrap.MiNiFiCommandState;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.slf4j.Logger;
+
+public class UpdatePropertiesService {
+ private final RunMiNiFi runner;
+ private final Logger logger;
+ private final BootstrapFileProvider bootstrapFileProvider;
+
+ public UpdatePropertiesService(RunMiNiFi runner, Logger logger,
BootstrapFileProvider bootstrapFileProvider) {
+ this.runner = runner;
+ this.logger = logger;
+ this.bootstrapFileProvider = bootstrapFileProvider;
+ }
+
+ public Optional<MiNiFiCommandState> handleUpdate() {
+ Optional<MiNiFiCommandState> commandState;
+ try {
+ File bootstrapConfigFile =
BootstrapFileProvider.getBootstrapConfFile();
+
+ File bootstrapSwapConfigFile =
bootstrapFileProvider.getBootstrapConfSwapFile();
+ logger.info("Persisting old bootstrap configuration to {}",
bootstrapSwapConfigFile.getAbsolutePath());
+
+ try (FileInputStream configFileInputStream = new
FileInputStream(bootstrapConfigFile)) {
+ Files.copy(configFileInputStream,
bootstrapSwapConfigFile.toPath(), REPLACE_EXISTING);
+ }
+
+
Files.copy(bootstrapFileProvider.getBootstrapConfNewFile().toPath(),
bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+
+ // already from new
+ commandState =
generateConfigfilesBasedOnNewProperties(bootstrapConfigFile,
bootstrapSwapConfigFile, bootstrapFileProvider.getBootstrapProperties());
+ } catch (Exception e) {
+ commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+ logger.error("Failed to load new bootstrap properties", e);
+ }
+ return commandState;
+ }
+
+ private Optional<MiNiFiCommandState>
generateConfigfilesBasedOnNewProperties(File bootstrapConfigFile, File
bootstrapSwapConfigFile, Properties bootstrapProperties)
+ throws IOException, ConfigurationChangeException {
+ Optional<MiNiFiCommandState> commandState = Optional.empty();
+ try {
+ ByteBuffer byteBuffer =
generateConfigFiles(asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()),
+ bootstrapProperties.getProperty(CONF_DIR_KEY),
bootstrapProperties);
+ runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+ restartInstance();
+ } catch (Exception e) {
+ commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED);
+ // reverting config file
+ try (FileInputStream swapConfigFileStream = new
FileInputStream(bootstrapSwapConfigFile)) {
+ Files.copy(swapConfigFileStream, bootstrapConfigFile.toPath(),
REPLACE_EXISTING);
+ }
+ // read reverted properties
+ bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+
+ ByteBuffer byteBuffer = generateConfigFiles(
+
asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()),
bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+ runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+
+ logger.debug("Transformation of new config file failed after swap
file was created, deleting it.");
+ if (!bootstrapSwapConfigFile.delete()) {
+ logger.warn("The swap file failed to delete after a failed
handling of a change. It should be cleaned up manually.");
Review Comment:
We could log the swap file location to be deleted as well.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
+ protected static final String REQUESTED_OPERATIONS_FILE_NAME =
"requestedOperations.data";
+
+ private final ObjectMapper objectMapper;
+ private final File requestedOperationsFile;
+
+ public FileBasedRequestedOperationDAO(String runDir, ObjectMapper
objectMapper) {
+ this.requestedOperationsFile = new File(runDir,
REQUESTED_OPERATIONS_FILE_NAME);
+ this.objectMapper = objectMapper;
+ }
+
+ public void save(OperationQueue operationQueue) {
+ LOGGER.info("Saving C2 operations to file");
+ LOGGER.debug("C2 Operation Queue: {}", operationQueue);
+ try {
+ objectMapper.writeValue(requestedOperationsFile, operationQueue);
+ } catch (Exception e) {
+ LOGGER.error("Failed to save requested c2 operations", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Optional<OperationQueue> get() {
+ LOGGER.info("Reading queued c2 operations from file");
+ if (requestedOperationsFile.exists()) {
+ try {
+ OperationQueue operationQueue =
objectMapper.readValue(requestedOperationsFile, OperationQueue.class);
+ LOGGER.debug("Queued operations: {}", operationQueue);
+ return Optional.of(operationQueue);
+ } catch (Exception e) {
+ LOGGER.error("Failed to read queued operations file", e);
+ }
+ } else {
+ LOGGER.info("There is no queued c2 operation");
+ }
+ return Optional.empty();
+ }
+
+ public void reset() {
+ if(requestedOperationsFile.exists() &&
!requestedOperationsFile.delete()) {
+ LOGGER.error("Failed to delete requested operations file");
Review Comment:
We could also log the location of the file and also say that it should be
deleted manually or later operations potentially will be lost as it won't be
persisted. Regardless some messaging with urgency would be needed here I think.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
Review Comment:
We could extract this to class level or at least outside of the loop :)
Maybe it could be renamed to sleepIncrement or similar.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static
org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PropertiesPersister.class);
+ private static final String VALID = "VALID";
+ private static final String EQUALS_SIGN = "=";
+ private static final String HASHMARK_SIGN = "#";
+
+ private final UpdatePropertiesPropertyProvider
updatePropertiesPropertyProvider;
+ private final AgentPropertyValidationContext validationContext;
+ private final File bootstrapFile;
+ private final File bootstrapNewFile;
+
+ public PropertiesPersister(UpdatePropertiesPropertyProvider
updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+ this.updatePropertiesPropertyProvider =
updatePropertiesPropertyProvider;
+ this.validationContext = new AgentPropertyValidationContext();
+ this.bootstrapFile = new File(bootstrapConfigFileLocation);
+ this.bootstrapNewFile = new File(bootstrapFile.getParentFile() +
"/bootstrap-updated.conf");
Review Comment:
We will need a shared constant to reference "bootstrap-updated.conf"
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static
org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PropertiesPersister.class);
+ private static final String VALID = "VALID";
+ private static final String EQUALS_SIGN = "=";
+ private static final String HASHMARK_SIGN = "#";
+
+ private final UpdatePropertiesPropertyProvider
updatePropertiesPropertyProvider;
+ private final AgentPropertyValidationContext validationContext;
+ private final File bootstrapFile;
+ private final File bootstrapNewFile;
+
+ public PropertiesPersister(UpdatePropertiesPropertyProvider
updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+ this.updatePropertiesPropertyProvider =
updatePropertiesPropertyProvider;
+ this.validationContext = new AgentPropertyValidationContext();
+ this.bootstrapFile = new File(bootstrapConfigFileLocation);
+ this.bootstrapNewFile = new File(bootstrapFile.getParentFile() +
"/bootstrap-updated.conf");
+ }
+
+ public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
+ int propertyCountToUpdate = validateProperties(propertiesToUpdate);
+ if (propertyCountToUpdate == 0) {
+ return false;
+ }
+ Set<String> propertiesToUpdateKeys = new
HashSet<>(propertiesToUpdate.keySet());
+
+ Set<String> updatedProperties = new HashSet<>();
+ try (BufferedReader reader = new BufferedReader(new
FileReader(bootstrapFile));
+ BufferedWriter bufferedWriter = new BufferedWriter(new
FileWriter(bootstrapNewFile, false))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ for (String key : propertiesToUpdateKeys) {
+ String prefix = key + EQUALS_SIGN;
+ if (line.startsWith(prefix) ||
line.startsWith(HASHMARK_SIGN + prefix)) {
+ line = prefix + propertiesToUpdate.get(key);
+ updatedProperties.add(key);
+ }
+ }
+ bufferedWriter.write(line + System.lineSeparator());
+ }
+
+ // add new properties which has no values before
+ propertiesToUpdateKeys.removeAll(updatedProperties);
+ for (String key : propertiesToUpdateKeys) {
+ bufferedWriter.write(key + EQUALS_SIGN +
propertiesToUpdate.get(key) + System.lineSeparator());
+ }
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return true;
+ }
+
+ private int validateProperties(Map<String, String> propertiesToUpdate) {
+ Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>)
updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
+ Map<String, UpdatableProperty> updatablePropertyMap =
updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName,
Function.identity()));
+ int propertyCountToUpdate = 0;
+ for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
+ UpdatableProperty updatableProperty =
updatablePropertyMap.get(entry.getKey());
+ if (updatableProperty == null) {
+ throw new IllegalArgumentException("You can not update the
requested property through C2 protocol");
+ }
+ if (!Objects.equals(updatableProperty.getPropertyValue(),
entry.getValue())) {
+ if (!getValidator(updatableProperty.getValidator())
+ .map(validator -> validator.validate(entry.getKey(),
entry.getValue(), validationContext))
+ .map(ValidationResult::isValid)
+ .orElse(true)) {
+ throw new IllegalArgumentException(String.format("Invalid
value for %s", entry.getKey()));
Review Comment:
With this fail fast approach there could be multiple retries while
everything is sorted out what do you think about collecting all the issues and
throwing exception only at the ned with all the relevant information?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.PROPERTIES_BY_KEY;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import org.apache.nifi.minifi.MiNiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesPropertyProvider implements
OperandPropertiesProvider {
Review Comment:
nice name :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]