briansolo1985 commented on code in PR #6733:
URL: https://github.com/apache/nifi/pull/6733#discussion_r1061435071
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+ bootstrapCommunicator.sendCommand(command,
objectMapper.writeValueAsString(c2Operation));
Review Comment:
We shouldn't serialize c2Operation and pass it to sendCommand, as it's not
used anymore.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response)
{
}
}
- private void handleRequestedOperations(List<C2Operation>
requestedOperations) {
- for (C2Operation requestedOperation : requestedOperations) {
- operationService.handleOperation(requestedOperation)
- .ifPresent(client::acknowledgeOperation);
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart()
+ && !Optional.ofNullable(c2OperationAck)
+ .map(C2OperationAck::getOperationState)
+ .map(C2OperationState::getState)
+ .filter(state -> !FULLY_APPLIED.equals(state))
+ .isPresent();
+ }
+
+ private Optional<C2OperationAck> handleOperation(C2OperationHandler
c2OperationHandler, C2Operation requestedOperation, List<C2Operation>
requestedOperations) {
+ C2OperationAck c2OperationAck =
c2OperationHandler.handle(requestedOperation);
+ if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+ handleRestartableOperation(requestedOperations,
requestedOperation);
+ return Optional.empty();
+ } else {
+ return Optional.of(c2OperationAck);
+ }
+ }
+
+ private void handleRestartableOperation(List<C2Operation>
remainingOperations, C2Operation requestedOperation) {
+ // need to stop heartbeats till the restart happens
+ heartbeatLocked = true;
Review Comment:
We should introduce a disableHeartbeat method so it would be a consistent
notation
(hearbeatLocked = false; is enableHeartbeat everywhere)
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+ bootstrapCommunicator.sendCommand(command,
objectMapper.writeValueAsString(c2Operation));
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void registerAcknowledgeTimeoutTask() {
+ bootstrapAcknowledgeExecutorService.schedule(() -> {
+ if (!ackReceived) {
+ LOGGER.info("Does not received acknowledge from bootstrap
after {} seconds. Handling remaining operations.",
MINIFI_RESTART_TIMEOUT_SECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ }
+ }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ private void acknowledgeHandler(String[] params) {
+ LOGGER.info("Received acknowledge message from bootstrap process");
+ if (params.length < 1) {
+ LOGGER.error("Invalid arguments coming from bootstrap, skipping
acknowledging latest operation");
+ return;
+ }
+
+ Optional<OperationQueue> optionalOperationQueue =
requestedOperationDAO.get();
+ ackReceived = true;
+ optionalOperationQueue.ifPresent(operationQueue -> {
+ C2Operation c2Operation = operationQueue.getCurrentOperation();
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(c2Operation.getIdentifier());
+ C2OperationState c2OperationState = new C2OperationState();
+ OperationState state = OperationState.valueOf(params[0]);
+ c2OperationState.setState(state);
+ c2OperationAck.setOperationState(c2OperationState);
+ c2ClientService.sendAcknowledge(c2OperationAck);
+ if (state != OperationState.FULLY_APPLIED) {
Review Comment:
Can we make a log here that this means that bootstrap did not restart the
process as the bootstrap side oepration has failed?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+ bootstrapCommunicator.sendCommand(command,
objectMapper.writeValueAsString(c2Operation));
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void registerAcknowledgeTimeoutTask() {
+ bootstrapAcknowledgeExecutorService.schedule(() -> {
Review Comment:
Do we have any safety valve here to prevent bootstrap to accidentally
restart MiNiFi even after the timeout?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -109,17 +131,25 @@ public C2NifiClientService(final NiFiProperties
niFiProperties, final FlowContro
OperandPropertiesProvider emptyOperandPropertiesProvider = new
EmptyOperandPropertiesProvider();
TransferDebugCommandHelper transferDebugCommandHelper = new
TransferDebugCommandHelper(niFiProperties);
UpdateAssetCommandHelper updateAssetCommandHelper = new
UpdateAssetCommandHelper(clientConfig.getC2AssetDirectory());
+ objectMapper = new ObjectMapper();
updateAssetCommandHelper.createAssetDirectory();
- C2OperationService c2OperationService = new
C2OperationService(Arrays.asList(
+ this.bootstrapCommunicator = bootstrapCommunicator;
+ requestedOperationDAO = new
FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir",
"bin"), objectMapper);
+ String bootstrapConfigFileLocation =
niFiProperties.getProperty("nifi.minifi.bootstrap.file");
+ updatePropertiesPropertyProvider = new
UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
+ propertiesPersister = new
PropertiesPersister(updatePropertiesPropertyProvider,
bootstrapConfigFileLocation);
+ C2OperationHandlerProvider c2OperationHandlerProvider = new
C2OperationHandlerProvider(Arrays.asList(
new UpdateConfigurationOperationHandler(client, flowIdHolder,
this::updateFlowContent, emptyOperandPropertiesProvider),
new DescribeManifestOperationHandler(heartbeatFactory,
this::generateRuntimeInfo, emptyOperandPropertiesProvider),
TransferDebugOperationHandler.create(client,
emptyOperandPropertiesProvider,
transferDebugCommandHelper.debugBundleFiles(),
transferDebugCommandHelper::excludeSensitiveText),
UpdateAssetOperationHandler.create(client,
emptyOperandPropertiesProvider,
- updateAssetCommandHelper::assetUpdatePrecondition,
updateAssetCommandHelper::assetPersistFunction)
+ updateAssetCommandHelper::assetUpdatePrecondition,
updateAssetCommandHelper::assetPersistFunction),
+ new
UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider,
propertiesPersister::persistProperties)
));
- this.c2ClientService = new C2ClientService(client, heartbeatFactory,
c2OperationService);
- this.supportedOperationsProvider = new
SupportedOperationsProvider(c2OperationService.getHandlers());
+ this.c2ClientService = new C2ClientService(client, heartbeatFactory,
c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
+ this.supportedOperationsProvider = new
SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
+ bootstrapCommunicator.registerMessageHandler("ACKNOWLEDGE_OPERATION",
(params, output) -> acknowledgeHandler(params));
Review Comment:
Can we move "ACKNOWLEDGE_OPERATION" to constant please?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
Review Comment:
```suggestion
String command = Optional.ofNullable(c2Operation.getOperand())
.map(operand -> c2Operation.getOperation().name() + "_" +
operand.name())
.orElse(c2Operation.getOperation().name());
```
But it's a matter of taste :)
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -59,11 +111,42 @@ private void processResponse(C2HeartbeatResponse response)
{
}
}
- private void handleRequestedOperations(List<C2Operation>
requestedOperations) {
- for (C2Operation requestedOperation : requestedOperations) {
- operationService.handleOperation(requestedOperation)
- .ifPresent(client::acknowledgeOperation);
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart()
+ && !Optional.ofNullable(c2OperationAck)
+ .map(C2OperationAck::getOperationState)
+ .map(C2OperationState::getState)
+ .filter(state -> !FULLY_APPLIED.equals(state))
+ .isPresent();
+ }
+
+ private Optional<C2OperationAck> handleOperation(C2OperationHandler
c2OperationHandler, C2Operation requestedOperation, List<C2Operation>
requestedOperations) {
+ C2OperationAck c2OperationAck =
c2OperationHandler.handle(requestedOperation);
+ if (requiresRestart(c2OperationHandler, c2OperationAck)) {
+ handleRestartableOperation(requestedOperations,
requestedOperation);
+ return Optional.empty();
+ } else {
+ return Optional.of(c2OperationAck);
+ }
+ }
+
+ private void handleRestartableOperation(List<C2Operation>
remainingOperations, C2Operation requestedOperation) {
+ // need to stop heartbeats till the restart happens
+ heartbeatLocked = true;
+
+ try {
+ requestedOperationDAO.save(new OperationQueue(requestedOperation,
remainingOperations));
+ c2OperationRegister.accept(requestedOperation);
+ } catch (Exception e) {
+ if (remainingOperations.isEmpty()) {
+ enableHeartbeat();
+ } else {
+ // reset saved queue and continue with remaining operations
+ requestedOperationDAO.reset();
+ handleRequestedOperations(remainingOperations);
Review Comment:
Maybe I'm missing something but recursive method calling is unnecessary
here. By handling the error in the caller method within the loop we could
achieve the same result without the need of recursion, and this part would
become more comprehensible (also applied some refactors to flatten the code)
```suggestion
public void handleRequestedOperations(List<C2Operation>
requestedOperations) {
LinkedList<C2Operation> c2Operations = new
LinkedList<>(requestedOperations);
C2Operation requestedOperation;
while ((requestedOperation = c2Operations.poll()) != null) {
Optional<C2OperationHandler> c2OperationHandler =
operationService.getHandlerForOperation(requestedOperation);
if (!c2OperationHandler.isPresent()) {
logger.warn("No handler found for {} {} operation",
requestedOperation.getOperation(), requestedOperation.getOperand());
continue;
}
C2OperationHandler operationHandler = c2OperationHandler.get();
C2OperationAck c2OperationAck =
operationHandler.handle(requestedOperation);
if (requiresRestart(operationHandler, c2OperationAck) &&
initiateRestart(c2Operations, requestedOperation)) {
return;
}
sendAcknowledge(c2OperationAck);
}
enableHeartbeat();
requestedOperationDAO.reset();
}
private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
return c2OperationHandler.requiresRestart()
&& !Optional.ofNullable(c2OperationAck)
.map(C2OperationAck::getOperationState)
.map(C2OperationState::getState)
.filter(state -> !FULLY_APPLIED.equals(state))
.isPresent();
}
private boolean initiateRestart(LinkedList<C2Operation>
lRequestedOperations, C2Operation requestedOperation) {
try {
// need to stop heartbeats till the restart happens
heartbeatLocked = true;
requestedOperationDAO.save(new
OperationQueue(requestedOperation, lRequestedOperations));
c2OperationRegister.accept(requestedOperation);
return true;
} catch (Exception e) {
logger.error("Failed to initiate restart. Dropping operation and
continue with remaining operations", e);
enableHeartbeat();
// reset saved queue and continue with remaining operations
requestedOperationDAO.reset();
}
return false;
}
```
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
+ private final RequestedOperationDAO requestedOperationDAO;
+ private final Consumer<C2Operation> c2OperationRegister;
+ private volatile boolean heartbeatLocked = false;
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationService operationService) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider operationService,
+ RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
this.operationService = operationService;
+ this.requestedOperationDAO = requestedOperationDAO;
+ this.c2OperationRegister = c2OperationRegister;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ if (heartbeatLocked) {
+ logger.debug("Restart is in progress, skipping heartbeat");
+ } else {
+ C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
Review Comment:
We should reduce the scope of the try catch block to the else block only.
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java:
##########
@@ -32,23 +42,65 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationService operationService;
+ private final C2OperationHandlerProvider operationService;
Review Comment:
We should rename the member according to it's type
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
Review Comment:
We could change order here, and pass the command string to
registerAcknowledgeTimeoutTask and log it there in case of timeout. That could
help debugging as we could see which command's ack has timed out without
scrolling the logs back.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
Review Comment:
This could be a constant or should be defined outside the loop
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+ bootstrapCommunicator.sendCommand(command,
objectMapper.writeValueAsString(c2Operation));
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void registerAcknowledgeTimeoutTask() {
+ bootstrapAcknowledgeExecutorService.schedule(() -> {
+ if (!ackReceived) {
+ LOGGER.info("Does not received acknowledge from bootstrap
after {} seconds. Handling remaining operations.",
MINIFI_RESTART_TIMEOUT_SECONDS);
Review Comment:
Can we add that "Operation requiring restart is failed, and no restart is
happened"?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+ bootstrapCommunicator.sendCommand(command,
objectMapper.writeValueAsString(c2Operation));
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void registerAcknowledgeTimeoutTask() {
+ bootstrapAcknowledgeExecutorService.schedule(() -> {
+ if (!ackReceived) {
+ LOGGER.info("Does not received acknowledge from bootstrap
after {} seconds. Handling remaining operations.",
MINIFI_RESTART_TIMEOUT_SECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ }
+ }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ private void acknowledgeHandler(String[] params) {
+ LOGGER.info("Received acknowledge message from bootstrap process");
+ if (params.length < 1) {
+ LOGGER.error("Invalid arguments coming from bootstrap, skipping
acknowledging latest operation");
+ return;
+ }
+
+ Optional<OperationQueue> optionalOperationQueue =
requestedOperationDAO.get();
+ ackReceived = true;
+ optionalOperationQueue.ifPresent(operationQueue -> {
Review Comment:
At this point if we can't load the operationQueue it should be considered an
error right? Should we log it at least?
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
+ }
+
+ private void registerOperation(C2Operation c2Operation) {
+ try {
+ ackReceived = false;
+ registerAcknowledgeTimeoutTask();
+ String command = c2Operation.getOperation().name() +
(c2Operation.getOperand() != null ? "_" + c2Operation.getOperand().name() : "");
+ bootstrapCommunicator.sendCommand(command,
objectMapper.writeValueAsString(c2Operation));
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void registerAcknowledgeTimeoutTask() {
+ bootstrapAcknowledgeExecutorService.schedule(() -> {
+ if (!ackReceived) {
+ LOGGER.info("Does not received acknowledge from bootstrap
after {} seconds. Handling remaining operations.",
MINIFI_RESTART_TIMEOUT_SECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ }
+ }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ private void acknowledgeHandler(String[] params) {
+ LOGGER.info("Received acknowledge message from bootstrap process");
+ if (params.length < 1) {
+ LOGGER.error("Invalid arguments coming from bootstrap, skipping
acknowledging latest operation");
+ return;
+ }
+
+ Optional<OperationQueue> optionalOperationQueue =
requestedOperationDAO.get();
+ ackReceived = true;
+ optionalOperationQueue.ifPresent(operationQueue -> {
+ C2Operation c2Operation = operationQueue.getCurrentOperation();
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(c2Operation.getIdentifier());
+ C2OperationState c2OperationState = new C2OperationState();
+ OperationState state = OperationState.valueOf(params[0]);
+ c2OperationState.setState(state);
+ c2OperationAck.setOperationState(c2OperationState);
+ c2ClientService.sendAcknowledge(c2OperationAck);
+ if (state != OperationState.FULLY_APPLIED) {
+ handleOngoingOperations(optionalOperationQueue);
Review Comment:
As second thought state != OperationState.FULLY_APPLIED can also mean that
the restart happened but Minifi was unable to start and bootstrap had to revert
the previous state, right?
In this case can we distinguish between the states?
We only need to call handleOngoingOperations when bootstrap did not restart
minifi.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
+ LOGGER.info("Handling ongoing operations: {}", operationQueue);
+ if (operationQueue.isPresent()) {
+ try {
+ waitForAcknowledgeFromBootstrap();
+
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process c2 operations queue", e);
+ c2ClientService.enableHeartbeat();
+ }
+ } else {
+ c2ClientService.enableHeartbeat();
+ }
+ }
+
+ private void waitForAcknowledgeFromBootstrap() {
+ LOGGER.info("Waiting for ACK signal from Bootstrap");
+ int currentWaitTime = 0;
+ while(!ackReceived) {
+ int sleep = 1000;
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
+ }
+ currentWaitTime += sleep;
+ if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+ LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+ break;
+ }
+ }
Review Comment:
We should log the ack has been successfully received
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.util.Optional;
+
+public interface RequestedOperationDAO {
+
+ /**
+ * Persist the given requested operation list
+ * @param operationQueue the queue containing the current and remaining
operations
+ */
+ void save(OperationQueue operationQueue);
+
+ /**
+ * Returns the saved Operations
+ *
+ * @return the C2 Operations queue with the actual operation
+ */
+ Optional<OperationQueue> get();
Review Comment:
Minor but maybe we should call this load() the other direction of save
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java:
##########
@@ -152,15 +185,94 @@ private C2ClientConfig
generateClientConfig(NiFiProperties properties) {
}
public void start() {
- scheduledExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ handleOngoingOperations(requestedOperationDAO.get());
+ heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleOngoingOperations(Optional<OperationQueue>
operationQueue) {
Review Comment:
I think we can get rid of the synchronized block if call this method from
acknowledgeHandler only if a restart has not happened.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]