This is an automated email from the ASF dual-hosted git repository.
bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 52e257e16c NIFI-13123 MiNiFi async C2 operation processing
52e257e16c is described below
commit 52e257e16c6d86cde377ba0063ac268fd9ac6129
Author: Ferenc Kis <[email protected]>
AuthorDate: Thu May 2 16:03:24 2024 +0200
NIFI-13123 MiNiFi async C2 operation processing
Signed-off-by: Csaba Bejan <[email protected]>
This closes #8738.
---
.../org/apache/nifi/c2/client/C2ClientConfig.java | 12 +
.../nifi/c2/client/service/C2ClientService.java | 149 -----------
.../nifi/c2/client/service/C2HeartbeatFactory.java | 98 ++++----
.../nifi/c2/client/service/C2HeartbeatManager.java | 79 ++++++
.../nifi/c2/client/service/C2OperationManager.java | 204 ++++++++++++++++
...tionDAO.java => C2OperationRestartHandler.java} | 26 +-
.../client/service/operation/OperationQueue.java | 17 +-
...tedOperationDAO.java => OperationQueueDAO.java} | 2 +-
.../c2/client/service/C2ClientServiceTest.java | 271 ---------------------
.../c2/client/service/C2HeartbeatManagerTest.java | 141 +++++++++++
.../c2/client/service/C2OperationManagerTest.java | 248 +++++++++++++++++++
.../nifi/minifi/commons/api/MiNiFiProperties.java | 1 +
.../c2/BootstrapC2OperationRestartHandler.java | 120 +++++++++
.../apache/nifi/minifi/c2/C2NifiClientService.java | 204 +++++-----------
...ionDAO.java => FileBasedOperationQueueDAO.java} | 13 +-
.../c2/BootstrapC2OperationRestartHandlerTest.java | 170 +++++++++++++
...st.java => FileBasedOperationQueueDAOTest.java} | 11 +-
.../nifi/minifi/bootstrap/BootstrapListener.java | 17 +-
.../nifi/bootstrap/BootstrapCommunicator.java | 6 +-
.../org/apache/nifi/bootstrap/CommandResult.java | 30 +--
20 files changed, 1139 insertions(+), 680 deletions(-)
diff --git
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index 3171280f99..d9cd3aa90f 100644
---
a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++
b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -57,6 +57,7 @@ public class C2ClientConfig {
private final long keepAliveDuration;
private final String c2RequestCompression;
private final String c2AssetDirectory;
+ private final long bootstrapAcknowledgeTimeout;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@@ -86,6 +87,7 @@ public class C2ClientConfig {
this.keepAliveDuration = builder.keepAliveDuration;
this.c2RequestCompression = builder.c2RequestCompression;
this.c2AssetDirectory = builder.c2AssetDirectory;
+ this.bootstrapAcknowledgeTimeout = builder.bootstrapAcknowledgeTimeout;
}
public String getC2Url() {
@@ -196,6 +198,10 @@ public class C2ClientConfig {
return keepAliveDuration;
}
+ public long getBootstrapAcknowledgeTimeout() {
+ return bootstrapAcknowledgeTimeout;
+ }
+
/**
* Builder for client configuration.
*/
@@ -231,6 +237,7 @@ public class C2ClientConfig {
private long keepAliveDuration;
private String c2RequestCompression;
private String c2AssetDirectory;
+ private long bootstrapAcknowledgeTimeout;
public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
@@ -377,6 +384,11 @@ public class C2ClientConfig {
return this;
}
+ public Builder bootstrapAcknowledgeTimeout(long
bootstrapAcknowledgeTimeout) {
+ this.bootstrapAcknowledgeTimeout = bootstrapAcknowledgeTimeout;
+ return this;
+ }
+
public C2ClientConfig build() {
return new C2ClientConfig(this);
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
deleted file mode 100644
index e8f0718cbd..0000000000
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.c2.client.service;
-
-import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Consumer;
-import org.apache.nifi.c2.client.api.C2Client;
-import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
-import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
-import org.apache.nifi.c2.protocol.api.C2Heartbeat;
-import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
-import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
-import org.apache.nifi.c2.protocol.api.C2OperationState;
-import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class C2ClientService {
-
- private static final Logger logger =
LoggerFactory.getLogger(C2ClientService.class);
-
- private final C2Client client;
- private final C2HeartbeatFactory c2HeartbeatFactory;
- private final C2OperationHandlerProvider c2OperationHandlerProvider;
- private final RequestedOperationDAO requestedOperationDAO;
- private final Consumer<C2Operation> c2OperationRegister;
- private volatile boolean heartbeatLocked = false;
-
- public C2ClientService(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, C2OperationHandlerProvider c2OperationHandlerProvider,
- RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation>
c2OperationRegister) {
- this.client = client;
- this.c2HeartbeatFactory = c2HeartbeatFactory;
- this.c2OperationHandlerProvider = c2OperationHandlerProvider;
- this.requestedOperationDAO = requestedOperationDAO;
- this.c2OperationRegister = c2OperationRegister;
- }
-
- public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
- if (heartbeatLocked) {
- logger.debug("Heartbeats are locked, skipping sending for
now");
- } else {
- try {
- C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
-
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
- } catch (Exception e) {
- logger.error("Failed to send/process heartbeat:", e);
- }
- }
- }
-
- public void sendAcknowledge(C2OperationAck operationAck) {
- try {
- client.acknowledgeOperation(operationAck);
- } catch (Exception e) {
- logger.error("Failed to send acknowledge:", e);
- }
- }
-
- public void enableHeartbeat() {
- heartbeatLocked = false;
- }
-
- private void disableHeartbeat() {
- heartbeatLocked = true;
- }
-
- public void handleRequestedOperations(List<C2Operation>
requestedOperations) {
- LinkedList<C2Operation> c2Operations = new
LinkedList<>(requestedOperations);
- C2Operation requestedOperation;
- while ((requestedOperation = c2Operations.poll()) != null) {
- Optional<C2OperationHandler> c2OperationHandler =
c2OperationHandlerProvider.getHandlerForOperation(requestedOperation);
- if (!c2OperationHandler.isPresent()) {
- continue;
- }
- C2OperationHandler operationHandler = c2OperationHandler.get();
- C2OperationAck c2OperationAck =
operationHandler.handle(requestedOperation);
- if (requiresRestart(operationHandler, c2OperationAck)) {
- if (initiateRestart(c2Operations, requestedOperation)) {
- return;
- }
- C2OperationState c2OperationState = new C2OperationState();
- c2OperationState.setState(OperationState.NOT_APPLIED);
- c2OperationAck.setOperationState(c2OperationState);
- }
- sendAcknowledge(c2OperationAck);
- }
- enableHeartbeat();
- requestedOperationDAO.cleanup();
- }
-
- private void processResponse(C2HeartbeatResponse response) {
- List<C2Operation> requestedOperations =
response.getRequestedOperations();
- if (requestedOperations != null && !requestedOperations.isEmpty()) {
- logger.info("Received {} operations from the C2 server",
requestedOperations.size());
- handleRequestedOperations(requestedOperations);
- } else {
- logger.trace("No operations received from the C2 server in the
server. Nothing to do.");
- }
- }
-
- private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
- return c2OperationHandler.requiresRestart() &&
isOperationFullyApplied(c2OperationAck);
- }
-
- private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
- return Optional.ofNullable(c2OperationAck)
- .map(C2OperationAck::getOperationState)
- .map(C2OperationState::getState)
- .filter(FULLY_APPLIED::equals)
- .isPresent();
- }
-
- private boolean initiateRestart(LinkedList<C2Operation>
requestedOperations, C2Operation requestedOperation) {
- try {
- disableHeartbeat();
- requestedOperationDAO.save(new OperationQueue(requestedOperation,
requestedOperations));
- c2OperationRegister.accept(requestedOperation);
- return true;
- } catch (Exception e) {
- logger.error("Failed to initiate restart. Dropping operation and
continue with remaining operations", e);
- requestedOperationDAO.cleanup();
- }
- return false;
- }
-
-}
-
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
index c939ab78ab..b5d9e9824e 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java
@@ -17,6 +17,13 @@
package org.apache.nifi.c2.client.service;
+import static java.net.NetworkInterface.getNetworkInterfaces;
+import static java.util.Collections.list;
+import static java.util.Comparator.comparing;
+import static java.util.Comparator.comparingInt;
+import static java.util.Map.entry;
+import static java.util.Objects.nonNull;
+import static java.util.stream.Collectors.toSet;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.File;
@@ -25,13 +32,12 @@ import java.lang.management.OperatingSystemMXBean;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
+import java.net.SocketException;
import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.PersistentUuidGenerator;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@@ -137,8 +143,7 @@ public class C2HeartbeatFactory {
}
private DeviceInfo generateDeviceInfo() {
- // Populate DeviceInfo
- final DeviceInfo deviceInfo = new DeviceInfo();
+ DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.setNetworkInfo(generateNetworkInfo());
deviceInfo.setIdentifier(getDeviceIdentifier(deviceInfo.getNetworkInfo()));
deviceInfo.setSystemInfo(generateSystemInfo());
@@ -146,67 +151,63 @@ public class C2HeartbeatFactory {
}
private NetworkInfo generateNetworkInfo() {
- NetworkInfo networkInfo = new NetworkInfo();
try {
- // Determine all interfaces
- final Enumeration<NetworkInterface> networkInterfaces =
NetworkInterface.getNetworkInterfaces();
-
- final Set<NetworkInterface> operationIfaces = new HashSet<>();
-
- // Determine eligible interfaces
- while (networkInterfaces.hasMoreElements()) {
- final NetworkInterface networkInterface =
networkInterfaces.nextElement();
- if (!networkInterface.isLoopback() && networkInterface.isUp())
{
- operationIfaces.add(networkInterface);
- }
+ Set<NetworkInterface> eligibleInterfaces =
list(getNetworkInterfaces())
+ .stream()
+ .filter(this::isEligibleInterface)
+ .collect(toSet());
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Found {} eligible interfaces with names {}",
eligibleInterfaces.size(),
+ eligibleInterfaces.stream()
+ .map(NetworkInterface::getName)
+ .collect(toSet())
+ );
}
- logger.trace("Have {} interfaces with names {}",
operationIfaces.size(),
- operationIfaces.stream()
- .map(NetworkInterface::getName)
- .collect(Collectors.toSet())
- );
-
- if (!operationIfaces.isEmpty()) {
- if (operationIfaces.size() > 1) {
- logger.debug("Instance has multiple interfaces. Generated
information may be non-deterministic.");
- }
- boolean networkInfoUnset = true;
- for (NetworkInterface networkInterface : operationIfaces) {
- Enumeration<InetAddress> inetAddresses =
networkInterface.getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- InetAddress inetAddress = inetAddresses.nextElement();
- // IPv4 address is preferred over IPv6 as it provides
more readable information for the user
- if (inetAddress instanceof Inet4Address) {
- updateNetworkInfo(networkInfo, networkInterface,
inetAddress);
- return networkInfo;
- }
- if (networkInfoUnset) {
- updateNetworkInfo(networkInfo, networkInterface,
inetAddress);
- networkInfoUnset = false;
- }
- }
- }
- }
+ Comparator<Map.Entry<NetworkInterface, InetAddress>>
orderByIp4AddressesFirst = comparingInt(item -> item.getValue() instanceof
Inet4Address ? 0 : 1);
+ Comparator<Map.Entry<NetworkInterface, InetAddress>>
orderByNetworkInterfaceName = comparing(entry -> entry.getKey().getName());
+ return eligibleInterfaces.stream()
+ .flatMap(networkInterface ->
list(networkInterface.getInetAddresses())
+ .stream()
+ .map(inetAddress -> entry(networkInterface, inetAddress)))
+
.sorted(orderByIp4AddressesFirst.thenComparing(orderByNetworkInterfaceName))
+ .findFirst()
+ .map(entry -> createNetworkInfo(entry.getKey(),
entry.getValue()))
+ .orElseGet(NetworkInfo::new);
} catch (Exception e) {
logger.error("Network Interface processing failed", e);
+ return new NetworkInfo();
}
- return networkInfo;
}
- private void updateNetworkInfo(NetworkInfo networkInfo, NetworkInterface
networkInterface, InetAddress inetAddress) {
+ private boolean isEligibleInterface(NetworkInterface networkInterface) {
+ try {
+ return !networkInterface.isLoopback()
+ && !networkInterface.isVirtual()
+ && networkInterface.isUp()
+ && nonNull(networkInterface.getHardwareAddress());
+ } catch (SocketException e) {
+ logger.warn("Error processing network interface", e);
+ return false;
+ }
+ }
+
+ private NetworkInfo createNetworkInfo(NetworkInterface networkInterface,
InetAddress inetAddress) {
+ NetworkInfo networkInfo = new NetworkInfo();
networkInfo.setDeviceId(networkInterface.getName());
networkInfo.setHostname(inetAddress.getHostName());
networkInfo.setIpAddress(inetAddress.getHostAddress());
+ return networkInfo;
}
private String getDeviceIdentifier(NetworkInfo networkInfo) {
if (deviceId == null) {
if (networkInfo.getDeviceId() != null) {
try {
- final NetworkInterface netInterface =
NetworkInterface.getByName(networkInfo.getDeviceId());
+ NetworkInterface netInterface =
NetworkInterface.getByName(networkInfo.getDeviceId());
byte[] hardwareAddress = netInterface.getHardwareAddress();
- final StringBuilder macBuilder = new StringBuilder();
+ StringBuilder macBuilder = new StringBuilder();
if (hardwareAddress != null) {
for (byte address : hardwareAddress) {
macBuilder.append(String.format("%02X", address));
@@ -221,7 +222,6 @@ public class C2HeartbeatFactory {
deviceId = getConfiguredDeviceId();
}
}
-
return deviceId;
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java
new file mode 100644
index 0000000000..a3729255ce
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.ofNullable;
+import static java.util.function.Predicate.not;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2HeartbeatManager implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(C2HeartbeatManager.class);
+
+ private final C2Client client;
+ private final C2HeartbeatFactory c2HeartbeatFactory;
+ private final ReentrantLock heartbeatLock;
+ private final RuntimeInfoWrapper runtimeInfoWrapper;
+ private final C2OperationManager c2OperationManager;
+
+ public C2HeartbeatManager(C2Client client, C2HeartbeatFactory
c2HeartbeatFactory, ReentrantLock heartbeatLock, RuntimeInfoWrapper
runtimeInfoWrapper,
+ C2OperationManager c2OperationManager) {
+ this.client = client;
+ this.c2HeartbeatFactory = c2HeartbeatFactory;
+ this.heartbeatLock = heartbeatLock;
+ this.runtimeInfoWrapper = runtimeInfoWrapper;
+ this.c2OperationManager = c2OperationManager;
+ }
+
+ @Override
+ public void run() {
+ if (!heartbeatLock.tryLock()) {
+ LOGGER.debug("Heartbeat lock is hold by another thread, skipping
heartbeat sending");
+ return;
+ }
+ try {
+ LOGGER.debug("Heartbeat lock is acquired, sending heartbeat");
+ C2Heartbeat c2Heartbeat =
c2HeartbeatFactory.create(runtimeInfoWrapper);
+
client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+ } catch (Exception e) {
+ LOGGER.error("Failed to send/process heartbeat", e);
+ } finally {
+ heartbeatLock.unlock();
+ LOGGER.debug("Heartbeat unlocked lock and heartbeat is sent
successfully");
+ }
+ }
+
+ private void processResponse(C2HeartbeatResponse response) {
+ ofNullable(response.getRequestedOperations())
+ .filter(not(List::isEmpty))
+ .ifPresentOrElse(operations -> {
+ LOGGER.info("Received {} operations from the C2 server",
operations.size());
+ operations.forEach(c2OperationManager::add);
+ },
+ () -> LOGGER.debug("No operations received from the C2 server")
+ );
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java
new file mode 100644
index 0000000000..85d9d2d9a0
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2OperationManager.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static java.util.function.Predicate.not;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class C2OperationManager implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(C2OperationManager.class);
+
+ private final C2Client client;
+ private final C2OperationHandlerProvider c2OperationHandlerProvider;
+ private final ReentrantLock heartbeatLock;
+ private final OperationQueueDAO operationQueueDAO;
+ private final C2OperationRestartHandler c2OperationRestartHandler;
+ private final BlockingQueue<C2Operation> c2Operations;
+
+ public C2OperationManager(C2Client client, C2OperationHandlerProvider
c2OperationHandlerProvider, ReentrantLock heartbeatLock,
+ OperationQueueDAO operationQueueDAO,
C2OperationRestartHandler c2OperationRestartHandler) {
+ this.client = client;
+ this.c2OperationHandlerProvider = c2OperationHandlerProvider;
+ this.heartbeatLock = heartbeatLock;
+ this.operationQueueDAO = operationQueueDAO;
+ this.c2OperationRestartHandler = c2OperationRestartHandler;
+ this.c2Operations = new LinkedBlockingQueue<>();
+ }
+
+ public void add(C2Operation c2Operation) {
+ try {
+ c2Operations.put(c2Operation);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread was interrupted", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ processRestartState();
+
+ while (true) {
+ C2Operation operation;
+ try {
+ operation = c2Operations.take();
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread was interrupted", e);
+ return;
+ }
+
+ LOGGER.debug("Processing operation {}", operation);
+ C2OperationHandler operationHandler =
c2OperationHandlerProvider.getHandlerForOperation(operation).orElse(null);
+ if (operationHandler == null) {
+ LOGGER.debug("No handler is present for C2 Operation {},
available handlers {}", operation, c2OperationHandlerProvider.getHandlers());
+ continue;
+ }
+
+ C2OperationAck c2OperationAck = operationHandler.handle(operation);
+ if (!requiresRestart(operationHandler, c2OperationAck)) {
+ LOGGER.debug("No restart is required. Sending ACK to C2 server
{}", c2OperationAck);
+ sendAcknowledge(c2OperationAck);
+ continue;
+ }
+
+ heartbeatLock.lock();
+ LOGGER.debug("Restart is required. Heartbeats are stopped until
restart is completed");
+ Optional<C2OperationState> restartState = initRestart(operation);
+ if (!restartState.isPresent()) {
+ LOGGER.debug("Restart in progress, stopping
C2OperationManager");
+ break;
+ }
+
+ try {
+ C2OperationState failedState = restartState.get();
+ LOGGER.debug("Restart handler returned with a failed state
{}", failedState);
+ c2OperationAck.setOperationState(failedState);
+ sendAcknowledge(c2OperationAck);
+ } finally {
+ operationQueueDAO.cleanup();
+ LOGGER.debug("Heartbeats are enabled again");
+ heartbeatLock.unlock();
+ }
+ }
+ }
+
+ private void processRestartState() {
+ Optional<OperationQueue> operationQueue = operationQueueDAO.load();
+
+ operationQueue.map(OperationQueue::getRemainingOperations)
+ .filter(not(List::isEmpty))
+ .ifPresent(this::processRemainingOperations);
+
+ operationQueue.map(OperationQueue::getCurrentOperation)
+ .ifPresentOrElse(this::processCurrentOperation,
+ () -> LOGGER.debug("No operation to acknowledge to C2
server"));
+
+ operationQueue.ifPresent(__ -> operationQueueDAO.cleanup());
+ }
+
+ private void processRemainingOperations(List<C2Operation>
remainingOperations) {
+ LOGGER.debug("Found remaining operations operations after restart.
Heartbeats are stopped until processing is completed");
+ heartbeatLock.lock();
+ try {
+ List<C2Operation> mergedOperations = new LinkedList<>();
+ mergedOperations.addAll(remainingOperations);
+ mergedOperations.addAll(c2Operations);
+ c2Operations.clear();
+ mergedOperations.forEach(c2Operations::add);
+ } catch (Exception e) {
+ LOGGER.warn("Unable to recover operations from operation queue",
e);
+ } finally {
+ heartbeatLock.unlock();
+ LOGGER.debug("Heartbeat lock released");
+ }
+ }
+
+ private void processCurrentOperation(C2Operation operation) {
+ LOGGER.debug("Found operation {} to acknowledge to C2 server",
operation);
+
+ C2OperationState c2OperationState =
c2OperationRestartHandler.waitForResponse()
+ .map(this::c2OperationState)
+ .orElse(c2OperationState(NOT_APPLIED));
+
+ C2OperationAck c2OperationAck = new C2OperationAck();
+ c2OperationAck.setOperationId(operation.getIdentifier());
+ c2OperationAck.setOperationState(c2OperationState);
+
+ sendAcknowledge(c2OperationAck);
+ }
+
+ private Optional<C2OperationState> initRestart(C2Operation operation) {
+ try {
+ LOGGER.debug("Restart initiated");
+ OperationQueue operationQueue = OperationQueue.create(operation,
c2Operations);
+ operationQueueDAO.save(operationQueue);
+ return
c2OperationRestartHandler.handleRestart(operation).map(this::c2OperationState);
+ } catch (Exception e) {
+ LOGGER.error("Failed to initiate restart. Dropping operation and
continue with remaining operations", e);
+ return of(c2OperationState(NOT_APPLIED));
+ }
+ }
+
+ private C2OperationState c2OperationState(OperationState operationState) {
+ C2OperationState c2OperationState = new C2OperationState();
+ c2OperationState.setState(operationState);
+ return c2OperationState;
+ }
+
+ private void sendAcknowledge(C2OperationAck operationAck) {
+ try {
+ client.acknowledgeOperation(operationAck);
+ } catch (Exception e) {
+ LOGGER.error("Failed to send acknowledge", e);
+ }
+ }
+
+ private boolean requiresRestart(C2OperationHandler c2OperationHandler,
C2OperationAck c2OperationAck) {
+ return c2OperationHandler.requiresRestart() &&
isOperationFullyApplied(c2OperationAck);
+ }
+
+ private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+ return ofNullable(c2OperationAck)
+ .map(C2OperationAck::getOperationState)
+ .map(C2OperationState::getState)
+ .filter(FULLY_APPLIED::equals)
+ .isPresent();
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java
similarity index 61%
copy from
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
copy to
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java
index 1216aa812d..c0b733946e 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationRestartHandler.java
@@ -18,28 +18,12 @@
package org.apache.nifi.c2.client.service.operation;
import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
-/**
- * The purpose of this interface is to be able to persist operations between
restarts.
- */
-public interface RequestedOperationDAO {
-
- /**
- * Persist the given requested operation list
- * @param operationQueue the queue containing the current and remaining
operations
- */
- void save(OperationQueue operationQueue);
-
- /**
- * Returns the saved Operations
- *
- * @return the C2 Operations queue with the actual operation
- */
- Optional<OperationQueue> load();
+public interface C2OperationRestartHandler {
- /**
- * Resets the saved operations
- */
- void cleanup();
+ Optional<C2OperationState.OperationState> handleRestart(C2Operation
c2Operation);
+ Optional<C2OperationState.OperationState> waitForResponse();
}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
index ae0c56c1a8..1f3b3ecb02 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
@@ -17,10 +17,12 @@
package org.apache.nifi.c2.client.service.operation;
+import static java.util.Optional.ofNullable;
+
import java.io.Serializable;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Queue;
import org.apache.nifi.c2.protocol.api.C2Operation;
public class OperationQueue implements Serializable {
@@ -29,12 +31,23 @@ public class OperationQueue implements Serializable {
private C2Operation currentOperation;
private List<C2Operation> remainingOperations;
+ public static OperationQueue create(C2Operation currentOperation,
Queue<C2Operation> remainingOperations) {
+ return new OperationQueue(
+ currentOperation,
+ ofNullable(remainingOperations)
+ .map(queue -> queue.stream().toList())
+ .orElseGet(List::of)
+ );
+
+ }
+
public OperationQueue() {
}
public OperationQueue(C2Operation currentOperation, List<C2Operation>
remainingOperations) {
this.currentOperation = currentOperation;
- this.remainingOperations = remainingOperations == null ?
Collections.emptyList() : remainingOperations;
+ this.remainingOperations = remainingOperations;
+
}
public C2Operation getCurrentOperation() {
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java
similarity index 97%
copy from
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
copy to
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java
index 1216aa812d..ec3ee18230 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
+++
b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueueDAO.java
@@ -22,7 +22,7 @@ import java.util.Optional;
/**
* The purpose of this interface is to be able to persist operations between
restarts.
*/
-public interface RequestedOperationDAO {
+public interface OperationQueueDAO {
/**
* Persist the given requested operation list
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
deleted file mode 100644
index 6de1f86015..0000000000
---
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.c2.client.service;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.nifi.c2.client.api.C2Client;
-import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
-import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
-import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
-import org.apache.nifi.c2.protocol.api.C2Heartbeat;
-import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
-import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
-import org.apache.nifi.c2.protocol.api.C2OperationState;
-import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-public class C2ClientServiceTest {
-
- @Mock
- private C2Client client;
-
- @Mock
- private C2HeartbeatFactory c2HeartbeatFactory;
-
- @Mock
- private C2OperationHandlerProvider operationService;
-
- @Mock
- private RuntimeInfoWrapper runtimeInfoWrapper;
-
- @Mock
- private RequestedOperationDAO requestedOperationDAO;
-
- @Mock
- private Consumer<C2Operation> c2OperationRegister;
-
- @InjectMocks
- private C2ClientService c2ClientService;
-
- @Test
- void testSendHeartbeatAndAckWhenOperationPresent() {
- C2Heartbeat heartbeat = mock(C2Heartbeat.class);
- when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
- C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
- final List<C2Operation> c2Operations = generateOperation(1);
- hbResponse.setRequestedOperations(c2Operations);
-
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
- C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
- when(c2OperationHandler.handle(c2Operations.get(0))).thenReturn(new
C2OperationAck());
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
- verify(c2HeartbeatFactory).create(any());
- verify(client).publishHeartbeat(heartbeat);
- verify(c2OperationHandler).handle(any());
- verify(client).acknowledgeOperation(any());
- }
-
- @Test
- void testSendHeartbeatAndAckForMultipleOperationPresent() {
- int operationNum = 5;
- C2Heartbeat heartbeat = mock(C2Heartbeat.class);
- when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
- C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
- hbResponse.setRequestedOperations(generateOperation(operationNum));
- C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
-
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
- when(c2OperationHandler.handle(any())).thenReturn(new
C2OperationAck());
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
- verify(c2HeartbeatFactory).create(any());
- verify(client).publishHeartbeat(heartbeat);
- verify(c2OperationHandler, times(operationNum)).handle(any());
- verify(client, times(operationNum)).acknowledgeOperation(any());
- }
-
- @Test
- void testSendHeartbeatHandlesNoHeartbeatResponse() {
- C2Heartbeat heartbeat = mock(C2Heartbeat.class);
- when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
- when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty());
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
- verify(c2HeartbeatFactory).create(any());
- verify(client).publishHeartbeat(heartbeat);
- verify(client, times(0)).acknowledgeOperation(any());
- }
-
- @Test
- void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() {
- C2Heartbeat heartbeat = mock(C2Heartbeat.class);
- when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
- C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
-
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
- verify(c2HeartbeatFactory).create(any());
- verify(client).publishHeartbeat(heartbeat);
- verify(client, times(0)).acknowledgeOperation(any());
- }
-
- @Test
- void testSendHeartbeatNotAckWhenOperationAckMissing() {
- C2Heartbeat heartbeat = mock(C2Heartbeat.class);
- when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
- C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
- hbResponse.setRequestedOperations(generateOperation(1));
-
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-
when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
- verify(c2HeartbeatFactory).create(any());
- verify(client).publishHeartbeat(heartbeat);
- verify(client, times(0)).acknowledgeOperation(any());
- }
-
- @Test
- void shouldHeartbeatSendingNotPropagateExceptions() {
- when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new
RuntimeException());
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
- }
-
- @Test
- void shouldAckSendingNotPropagateExceptions() {
- C2OperationAck operationAck = mock(C2OperationAck.class);
- doThrow(new
RuntimeException()).when(client).acknowledgeOperation(operationAck);
-
- c2ClientService.sendAcknowledge(operationAck);
- }
-
- @Test
- void
shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState()
{
- C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
- C2OperationAck operationAck = new C2OperationAck();
- C2OperationState c2OperationState = new C2OperationState();
- c2OperationState.setState(OperationState.NOT_APPLIED);
- operationAck.setOperationState(c2OperationState);
- when(c2OperationHandler.requiresRestart()).thenReturn(true);
-
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
-
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(operationAck);
-
- c2ClientService.handleRequestedOperations(generateOperation(1));
-
-
verify(operationService).getHandlerForOperation(any(C2Operation.class));
- verify(c2OperationHandler).handle(any(C2Operation.class));
- verify(requestedOperationDAO).cleanup();
- verify(client).acknowledgeOperation(operationAck);
- verifyNoMoreInteractions(operationService, client,
requestedOperationDAO);
- verifyNoInteractions(c2HeartbeatFactory, c2OperationRegister);
- }
-
- @Test
- void
shouldSaveOperationQueueIfRestartIsNeededAndThereAreMultipleRequestedOperations()
{
- C2Operation c2Operation1 = new C2Operation();
- c2Operation1.setIdentifier("1");
- C2Operation c2Operation2 = new C2Operation();
- c2Operation2.setIdentifier("2");
- C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
- when(c2OperationHandler.requiresRestart()).thenReturn(true);
-
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
- C2OperationAck c2OperationAck = new C2OperationAck();
- C2OperationState c2OperationState = new C2OperationState();
- c2OperationState.setState(OperationState.FULLY_APPLIED);
- c2OperationAck.setOperationState(c2OperationState);
-
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
-
- c2ClientService.handleRequestedOperations(Arrays.asList(c2Operation1,
c2Operation2));
-
- verify(requestedOperationDAO).save(new OperationQueue(c2Operation1,
Collections.singletonList(c2Operation2)));
- verify(c2OperationRegister).accept(c2Operation1);
- verifyNoInteractions(client, c2HeartbeatFactory);
- verifyNoMoreInteractions(requestedOperationDAO, c2OperationRegister,
operationService);
- }
-
- @Test
- void
shouldReEnableHeartbeatsIfExceptionHappensDuringRegisteringOperationAndThereIsNoMoreOperationInQueue()
{
- C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
- C2Operation operation = new C2Operation();
- when(c2OperationHandler.requiresRestart()).thenReturn(true);
-
when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
- C2OperationAck c2OperationAck = new C2OperationAck();
- C2OperationState c2OperationState = new C2OperationState();
- c2OperationState.setState(OperationState.FULLY_APPLIED);
- c2OperationAck.setOperationState(c2OperationState);
-
when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
- doThrow(new
RuntimeException()).when(c2OperationRegister).accept(any(C2Operation.class));
-
c2ClientService.handleRequestedOperations(Collections.singletonList(operation));
- when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenReturn(new
C2Heartbeat());
-
- c2ClientService.sendHeartbeat(runtimeInfoWrapper);
-
- verify(c2HeartbeatFactory).create(runtimeInfoWrapper);
- verify(client).publishHeartbeat(any(C2Heartbeat.class));
- }
-
- @Test
- void
shouldContinueWithRemainingOperationsIfExceptionHappensDuringRegisteringOperationAndThereAreMoreOperationsInQueue()
{
- C2OperationHandler c2OperationHandlerForRestart =
mock(C2OperationHandler.class);
- C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
- C2Operation operation1 = new C2Operation();
- operation1.setIdentifier("1");
- C2Operation operation2 = new C2Operation();
- operation2.setIdentifier("2");
- C2OperationAck c2OperationAck = new C2OperationAck();
- C2OperationState c2OperationState = new C2OperationState();
- c2OperationState.setState(OperationState.FULLY_APPLIED);
- c2OperationAck.setOperationState(c2OperationState);
- when(c2OperationHandler.requiresRestart()).thenReturn(false);
- when(c2OperationHandlerForRestart.requiresRestart()).thenReturn(true);
-
when(operationService.getHandlerForOperation(operation1)).thenReturn(Optional.of(c2OperationHandlerForRestart));
-
when(operationService.getHandlerForOperation(operation2)).thenReturn(Optional.of(c2OperationHandler));
-
when(c2OperationHandlerForRestart.handle(operation1)).thenReturn(c2OperationAck);
- when(c2OperationHandler.handle(operation2)).thenReturn(c2OperationAck);
-
- doThrow(new
RuntimeException()).when(c2OperationRegister).accept(operation1);
-
- c2ClientService.handleRequestedOperations(Arrays.asList(operation1,
operation2));
-
- verify(client, times(2)).acknowledgeOperation(c2OperationAck);
- }
-
- private List<C2Operation> generateOperation(int num) {
- return IntStream.range(0, num)
- .mapToObj(x -> new C2Operation())
- .collect(Collectors.toList());
- }
-}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java
new file mode 100644
index 0000000000..2de1483487
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2HeartbeatManagerTest {
+
+ @Mock
+ private C2Client mockC2Client;
+
+ @Mock
+ private C2HeartbeatFactory mockC2HeartbeatFactory;
+
+ @Mock
+ private ReentrantLock mockHeartbeatLock;
+
+ @Mock
+ private RuntimeInfoWrapper mockRuntimeInfoWrapper;
+
+ @Mock
+ private C2OperationManager mockC2OperationManager;
+
+ @InjectMocks
+ private C2HeartbeatManager testHeartbeatManager;
+
+ @Test
+ void shouldSkipSendingHeartbeatIfHeartbeatLockIsAcquired() {
+ when(mockHeartbeatLock.tryLock()).thenReturn(false);
+
+ testHeartbeatManager.run();
+
+ verify(mockC2HeartbeatFactory, never()).create(any());
+ verify(mockC2Client, never()).publishHeartbeat(any());
+ verify(mockC2OperationManager, never()).add(any());
+ verify(mockHeartbeatLock, never()).unlock();
+ }
+
+ @Test
+ void shouldSendHeartbeatAndProcessEmptyResponse() {
+ when(mockHeartbeatLock.tryLock()).thenReturn(true);
+ C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
+
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
+
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(empty());
+
+ testHeartbeatManager.run();
+
+ verify(mockC2HeartbeatFactory,
times(1)).create(mockRuntimeInfoWrapper);
+ verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
+ verify(mockC2OperationManager, never()).add(any());
+ verify(mockHeartbeatLock, times(1)).unlock();
+
+ }
+
+ @Test
+ void shouldSendHeartbeatAndProcessResponseWithNoOperation() {
+ when(mockHeartbeatLock.tryLock()).thenReturn(true);
+ C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
+
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
+ C2HeartbeatResponse mockC2HeartbeatResponse =
mock(C2HeartbeatResponse.class);
+
when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of());
+
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(Optional.of(mockC2HeartbeatResponse));
+
+ testHeartbeatManager.run();
+
+ verify(mockC2HeartbeatFactory,
times(1)).create(mockRuntimeInfoWrapper);
+ verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
+ verify(mockC2OperationManager, never()).add(any());
+ verify(mockHeartbeatLock, times(1)).unlock();
+ }
+
+ @Test
+ void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() {
+ when(mockHeartbeatLock.tryLock()).thenReturn(true);
+ C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class);
+
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat);
+ C2HeartbeatResponse mockC2HeartbeatResponse =
mock(C2HeartbeatResponse.class);
+ C2Operation mockOperation1 = mock(C2Operation.class);
+ C2Operation mockOperation2 = mock(C2Operation.class);
+
when(mockC2HeartbeatResponse.getRequestedOperations()).thenReturn(List.of(mockOperation1,
mockOperation2));
+
when(mockC2Client.publishHeartbeat(mockC2Heartbeat)).thenReturn(ofNullable(mockC2HeartbeatResponse));
+
+ testHeartbeatManager.run();
+
+ verify(mockC2HeartbeatFactory,
times(1)).create(mockRuntimeInfoWrapper);
+ verify(mockC2Client, times(1)).publishHeartbeat(mockC2Heartbeat);
+ verify(mockC2OperationManager, times(1)).add(mockOperation1);
+ verify(mockC2OperationManager, times(1)).add(mockOperation2);
+ verify(mockHeartbeatLock, times(1)).unlock();
+ }
+
+ @Test
+ void shouldReleaseHeartbeatLockWhenExceptionOccurs() {
+ when(mockHeartbeatLock.tryLock()).thenReturn(true);
+
when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenThrow(new
RuntimeException());
+
+ testHeartbeatManager.run();
+
+ verify(mockC2HeartbeatFactory,
times(1)).create(mockRuntimeInfoWrapper);
+ verify(mockC2Client, never()).publishHeartbeat(any());
+ verify(mockC2OperationManager, never()).add(any());
+ verify(mockHeartbeatLock, times(1)).unlock();
+ }
+}
diff --git
a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java
new file mode 100644
index 0000000000..d531d73ddf
--- /dev/null
+++
b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2OperationManagerTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2OperationManagerTest {
+
+ private static final long MAX_WAIT_TIME_MS = 1000;
+
+ @Mock
+ private C2Client mockC2Client;
+
+ @Mock
+ private C2OperationHandlerProvider mockC2OperationHandlerProvider;
+
+ @Mock
+ private ReentrantLock mockHeartbeatLock;
+
+ @Mock
+ private OperationQueueDAO mockOperationQueueDAO;
+
+ @Mock
+ private C2OperationRestartHandler mockC2OperationRestartHandler;
+
+ @InjectMocks
+ private C2OperationManager testC2OperationManager;
+
+ @Captor
+ ArgumentCaptor<C2OperationAck> c2OperationAckArgumentCaptor;
+
+ private ExecutorService executorService;
+
+ @BeforeEach
+ void setup() {
+ executorService = newVirtualThreadPerTaskExecutor();
+ }
+
+ @AfterEach
+ void teardown() {
+ executorService.shutdownNow();
+ }
+
+ @Test
+ void shouldWaitForIncomingOperationThenTimeout() {
+ Future<?> future = executorService.submit(testC2OperationManager);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockC2OperationHandlerProvider,
never()).getHandlerForOperation(any());
+ }
+
+ @Test
+ void shouldContinueWithoutProcessingWhenNoHandlerIsDefined() {
+ C2Operation testOperation = mock(C2Operation.class);
+
when(mockC2OperationHandlerProvider.getHandlerForOperation(testOperation)).thenReturn(empty());
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+ testC2OperationManager.add(testOperation);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockC2Client, never()).acknowledgeOperation(any());
+ verify(mockHeartbeatLock, never()).lock();
+ }
+
+ @Test
+ void shouldProcessOperationWithoutRestartAndAcknowledge() {
+ C2Operation mockOperation = mock(C2Operation.class);
+ C2OperationHandler mockOperationHandler =
mock(C2OperationHandler.class);
+ C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+ when(mockOperationHandler.requiresRestart()).thenReturn(false);
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+ testC2OperationManager.add(mockOperation);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockC2Client,
times(1)).acknowledgeOperation(mockC2OperationAck);
+ verify(mockHeartbeatLock, never()).lock();
+ }
+
+ @Test
+ void shouldProcessOperationWithSuccessfulRestart() {
+ C2Operation mockOperation = mock(C2Operation.class);
+ C2OperationHandler mockOperationHandler =
mock(C2OperationHandler.class);
+ C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+ C2OperationState mockC2OperationState = mock(C2OperationState.class);
+
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+ when(mockOperationHandler.requiresRestart()).thenReturn(true);
+
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
+ when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
+
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(empty());
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+ testC2OperationManager.add(mockOperation);
+
+ assertDoesNotThrow(() -> future.get());
+ verify(mockC2Client, never()).acknowledgeOperation(mockC2OperationAck);
+ verify(mockHeartbeatLock, times(1)).lock();
+ verify(mockHeartbeatLock, never()).unlock();
+ verify(mockOperationQueueDAO, times(1)).save(any());
+ verify(mockOperationQueueDAO, never()).cleanup();
+ }
+
+ @Test
+ void shouldProcessOperationWithFailedRestartDueToFailedResponse() {
+ C2Operation mockOperation = mock(C2Operation.class);
+ C2OperationHandler mockOperationHandler =
mock(C2OperationHandler.class);
+ C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+ C2OperationState mockC2OperationState = mock(C2OperationState.class);
+
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+ when(mockOperationHandler.requiresRestart()).thenReturn(true);
+
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
+ when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
+
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenReturn(ofNullable(NOT_APPLIED));
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+ testC2OperationManager.add(mockOperation);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockHeartbeatLock, times(1)).lock();
+ verify(mockHeartbeatLock, times(1)).unlock();
+ verify(mockC2Client,
times(1)).acknowledgeOperation(mockC2OperationAck);
+ verify(mockOperationQueueDAO, times(1)).save(any());
+ verify(mockOperationQueueDAO, times(1)).cleanup();
+ }
+
+ @Test
+ void shouldProcessOperationWithFailedRestartDueToException() {
+ C2Operation mockOperation = mock(C2Operation.class);
+ C2OperationHandler mockOperationHandler =
mock(C2OperationHandler.class);
+ C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+ C2OperationState mockC2OperationState = mock(C2OperationState.class);
+
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockOperation)).thenReturn(ofNullable(mockOperationHandler));
+
when(mockOperationHandler.handle(mockOperation)).thenReturn(mockC2OperationAck);
+ when(mockOperationHandler.requiresRestart()).thenReturn(true);
+
when(mockC2OperationAck.getOperationState()).thenReturn(mockC2OperationState);
+ when(mockC2OperationState.getState()).thenReturn(FULLY_APPLIED);
+
when(mockC2OperationRestartHandler.handleRestart(mockOperation)).thenThrow(new
RuntimeException());
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+ testC2OperationManager.add(mockOperation);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockHeartbeatLock, times(1)).lock();
+ verify(mockHeartbeatLock, times(1)).unlock();
+ verify(mockC2Client,
times(1)).acknowledgeOperation(mockC2OperationAck);
+ verify(mockOperationQueueDAO, times(1)).save(any());
+ verify(mockOperationQueueDAO, times(1)).cleanup();
+ }
+
+ @Test
+ void shouldProcessStateWithOneCurrentAndNoRemainingOperations() {
+ OperationQueue mockOperationQueue = mock(OperationQueue.class);
+ C2Operation mockCurrentOperation = mock(C2Operation.class);
+
when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation);
+
when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of());
+
when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue));
+
when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED));
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockHeartbeatLock, never()).lock();
+ verify(mockHeartbeatLock, never()).unlock();
+ verify(mockC2Client,
times(1)).acknowledgeOperation(c2OperationAckArgumentCaptor.capture());
+ assertEquals(FULLY_APPLIED,
c2OperationAckArgumentCaptor.getValue().getOperationState().getState());
+ }
+
+ @Test
+ void shouldProcessStateWithOneCurrentAndOneRemainingOperation() {
+ OperationQueue mockOperationQueue = mock(OperationQueue.class);
+ C2Operation mockCurrentOperation = mock(C2Operation.class);
+ C2Operation mockRemainingOperation = mock(C2Operation.class);
+
when(mockOperationQueue.getCurrentOperation()).thenReturn(mockCurrentOperation);
+
when(mockOperationQueue.getRemainingOperations()).thenReturn(List.of(mockRemainingOperation));
+
when(mockOperationQueueDAO.load()).thenReturn(ofNullable(mockOperationQueue));
+
when(mockC2OperationRestartHandler.waitForResponse()).thenReturn(ofNullable(FULLY_APPLIED));
+ C2OperationHandler mockOperationHandler =
mock(C2OperationHandler.class);
+ C2OperationAck mockC2OperationAck = mock(C2OperationAck.class);
+
when(mockC2OperationHandlerProvider.getHandlerForOperation(mockRemainingOperation)).thenReturn(ofNullable(mockOperationHandler));
+
when(mockOperationHandler.handle(mockRemainingOperation)).thenReturn(mockC2OperationAck);
+ when(mockOperationHandler.requiresRestart()).thenReturn(false);
+
+ Future<?> future = executorService.submit(testC2OperationManager);
+
+ assertThrows(TimeoutException.class, () ->
future.get(MAX_WAIT_TIME_MS, MILLISECONDS));
+ verify(mockHeartbeatLock, times(1)).lock();
+ verify(mockHeartbeatLock, times(1)).unlock();
+ verify(mockC2Client, times(2)).acknowledgeOperation(any());
+ }
+}
diff --git
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
index f685519f04..07e2c16a50 100644
---
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
+++
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java
@@ -91,6 +91,7 @@ public enum MiNiFiProperties {
C2_SECURITY_KEYSTORE_PASSWORD("c2.security.keystore.password", "", true,
false, VALID),
C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false,
false, VALID),
C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true,
VALID),
+ C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT("c2.bootstrap.acknowledge.timeout", "15
sec", false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null,
false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path",
null, false, true, VALID),
NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds",
null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java
new file mode 100644
index 0000000000..76632381dc
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
+import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.bootstrap.CommandResult;
+import org.apache.nifi.c2.client.service.operation.C2OperationRestartHandler;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BootstrapC2OperationRestartHandler implements
C2OperationRestartHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BootstrapC2OperationRestartHandler.class);
+
+ private static final String ACKNOWLEDGE_OPERATION =
"ACKNOWLEDGE_OPERATION";
+ private static final String TIMEOUT = "timeout";
+ private static final Map<MiNiFiCommandState, OperationState>
OPERATION_STATE_MAP = Map.of(
+ MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED,
+ MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION,
+ MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, NOT_APPLIED,
+ MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, NOT_APPLIED);
+
+ private final BootstrapCommunicator bootstrapCommunicator;
+ private final BlockingQueue<OperationState> operationStateHolder;
+ private final long bootstrapAcknowledgeTimeoutMs;
+
+ public BootstrapC2OperationRestartHandler(BootstrapCommunicator
bootstrapCommunicator, long bootstrapAcknowledgeTimeoutMs) {
+ this.bootstrapCommunicator = bootstrapCommunicator;
+ this.operationStateHolder = new ArrayBlockingQueue<>(1);
+ this.bootstrapAcknowledgeTimeoutMs = bootstrapAcknowledgeTimeoutMs;
+ bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION,
this::bootstrapCallback);
+ }
+
+ @Override
+ public Optional<OperationState> handleRestart(C2Operation c2Operation) {
+ CommandResult sendCommandResult = sendBootstrapCommand(c2Operation);
+ if (sendCommandResult == SUCCESS) {
+ LOGGER.debug("Bootstrap successfully received command. Waiting for
response");
+ return waitForResponse();
+ } else {
+ LOGGER.debug("Bootstrap failed to receive command");
+ return Optional.of(NOT_APPLIED);
+ }
+ }
+
+ @Override
+ public Optional<OperationState> waitForResponse() {
+ try {
+ OperationState operationState =
operationStateHolder.poll(bootstrapAcknowledgeTimeoutMs, MILLISECONDS);
+ LOGGER.debug("Bootstrap returned response: {}",
ofNullable(operationState).map(Objects::toString).orElse(TIMEOUT));
+ return Optional.of(ofNullable(operationState).orElse(NOT_APPLIED));
+ } catch (InterruptedException e) {
+ LOGGER.debug("Bootstrap response waiting interrupted, possible due
to Bootstrap is restarting MiNiFi process");
+ return empty();
+ }
+ }
+
+ private CommandResult sendBootstrapCommand(C2Operation c2Operation) {
+ String command = createBootstrapCommand(c2Operation);
+ try {
+ return bootstrapCommunicator.sendCommand(command);
+ } catch (IOException e) {
+ LOGGER.error("Failed to send operation to bootstrap", e);
+ return FAILURE;
+ }
+ }
+
+ private String createBootstrapCommand(C2Operation c2Operation) {
+ return ofNullable(c2Operation.getOperand())
+ .map(operand -> c2Operation.getOperation().name() + "_" +
operand.name())
+ .orElse(c2Operation.getOperation().name());
+ }
+
+ private void bootstrapCallback(String[] params, OutputStream outputStream)
{
+ LOGGER.info("Received acknowledge message from bootstrap process");
+ if (params.length < 1) {
+ LOGGER.error("Invalid arguments coming from bootstrap");
+ return;
+ }
+ MiNiFiCommandState miNiFiCommandState =
MiNiFiCommandState.valueOf(params[0]);
+ OperationState operationState =
OPERATION_STATE_MAP.get(miNiFiCommandState);
+ try {
+ operationStateHolder.put(operationState);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Bootstrap hook thread was interrupted");
+ }
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 0915d87ce6..e0ddec368a 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -17,12 +17,19 @@
package org.apache.nifi.minifi.c2;
+import static java.lang.Boolean.parseBoolean;
+import static java.lang.Integer.parseInt;
+import static java.lang.Long.parseLong;
import static java.util.Optional.ofNullable;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_CLASS;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_AGENT_IDENTIFIER;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_ASSET_DIRECTORY;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_CONFIG_DIRECTORY;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_FULL_HEARTBEAT;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
@@ -45,24 +52,24 @@ import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_KE
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_LOCATION;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD;
import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE;
+import static org.apache.nifi.util.FormatUtils.getPreciseTimeDuration;
import static org.apache.nifi.util.NiFiProperties.FLOW_CONFIGURATION_FILE;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.http.C2HttpClient;
-import org.apache.nifi.c2.client.service.C2ClientService;
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
+import org.apache.nifi.c2.client.service.C2HeartbeatManager;
+import org.apache.nifi.c2.client.service.C2OperationManager;
import org.apache.nifi.c2.client.service.FlowIdHolder;
import org.apache.nifi.c2.client.service.ManifestHashProvider;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@@ -70,8 +77,7 @@ import
org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
import
org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
import
org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
-import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
@@ -81,10 +87,6 @@ import
org.apache.nifi.c2.client.service.operation.UpdatePropertiesOperationHand
import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
-import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
-import org.apache.nifi.c2.protocol.api.C2OperationState;
-import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.controller.FlowController;
@@ -99,14 +101,13 @@ import
org.apache.nifi.minifi.c2.command.PropertiesPersister;
import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
-import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.nar.ExtensionManagerHolder;
import org.apache.nifi.services.FlowService;
-import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,35 +116,25 @@ public class C2NifiClientService {
private static final Logger LOGGER =
LoggerFactory.getLogger(C2NifiClientService.class);
private static final String ROOT_GROUP_ID = "root";
- private static final Long INITIAL_DELAY = 10000L;
private static final Integer TERMINATION_WAIT = 5000;
- private static final int MINIFI_RESTART_TIMEOUT_SECONDS = 60;
- private static final String ACKNOWLEDGE_OPERATION =
"ACKNOWLEDGE_OPERATION";
- private static final int IS_ACK_RECEIVED_POLL_INTERVAL = 1000;
- private static final int MAX_WAIT_FOR_BOOTSTRAP_ACK_MS = 20000;
+ private static final Long INITIAL_HEARTBEAT_DELAY_MS = 10000L;
- private static final Map<MiNiFiCommandState, OperationState>
OPERATION_STATE_MAP = Map.of(
- MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED,
- MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION,
- MiNiFiCommandState.NOT_APPLIED_WITH_RESTART,
OperationState.NOT_APPLIED,
- MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART,
OperationState.NOT_APPLIED);
+ private final ScheduledExecutorService heartbeatManagerExecutorService;
+ private final ExecutorService operationManagerExecutorService;
- private final C2ClientService c2ClientService;
private final FlowController flowController;
- private final ScheduledThreadPoolExecutor heartbeatExecutorService;
- private final ScheduledThreadPoolExecutor
bootstrapAcknowledgeExecutorService;
private final ExtensionManifestParser extensionManifestParser;
private final RuntimeManifestService runtimeManifestService;
private final SupportedOperationsProvider supportedOperationsProvider;
- private final RequestedOperationDAO requestedOperationDAO;
- private final BootstrapCommunicator bootstrapCommunicator;
- private final long heartbeatPeriod;
+ private final C2HeartbeatManager c2HeartbeatManager;
+ private final C2OperationManager c2OperationManager;
- private volatile boolean ackReceived = false;
+ private final long heartbeatPeriod;
public C2NifiClientService(NiFiProperties niFiProperties, FlowController
flowController, BootstrapCommunicator bootstrapCommunicator, FlowService
flowService) {
- this.heartbeatExecutorService = new ScheduledThreadPoolExecutor(1);
- this.bootstrapAcknowledgeExecutorService = new
ScheduledThreadPoolExecutor(1);
+ this.heartbeatManagerExecutorService = newScheduledThreadPool(1);
+ this.operationManagerExecutorService = newSingleThreadExecutor();
+
this.extensionManifestParser = new JAXBExtensionManifestParser();
C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
@@ -160,36 +151,34 @@ public class C2NifiClientService {
C2HttpClient client = C2HttpClient.create(clientConfig, new
C2JacksonSerializer());
FlowIdHolder flowIdHolder = new
FlowIdHolder(clientConfig.getConfDirectory());
C2HeartbeatFactory heartbeatFactory = new
C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
-
- this.requestedOperationDAO = new
FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir",
"bin"), new ObjectMapper());
String bootstrapConfigFileLocation =
niFiProperties.getProperty("nifi.minifi.bootstrap.file");
-
C2OperationHandlerProvider c2OperationHandlerProvider =
c2OperationHandlerProvider(niFiProperties, flowController, flowService,
flowIdHolder,
client, heartbeatFactory, bootstrapConfigFileLocation,
clientConfig.getC2AssetDirectory());
- this.c2ClientService = new C2ClientService(client, heartbeatFactory,
c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
this.supportedOperationsProvider = new
SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
- this.bootstrapCommunicator = bootstrapCommunicator;
-
this.bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION,
(params, output) -> acknowledgeHandler(params));
+ OperationQueueDAO operationQueueDAO =
+ new
FileBasedOperationQueueDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir",
"bin"), new ObjectMapper());
+ ReentrantLock heartbeatLock = new ReentrantLock();
+ BootstrapC2OperationRestartHandler c2OperationRestartHandler = new
BootstrapC2OperationRestartHandler(bootstrapCommunicator,
clientConfig.getBootstrapAcknowledgeTimeout());
+
+ this.c2OperationManager = new C2OperationManager(
+ client, c2OperationHandlerProvider, heartbeatLock,
operationQueueDAO, c2OperationRestartHandler);
+ this.c2HeartbeatManager = new C2HeartbeatManager(
+ client, heartbeatFactory, heartbeatLock, generateRuntimeInfo(),
c2OperationManager);
}
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
return new C2ClientConfig.Builder()
.agentClass(properties.getProperty(C2_AGENT_CLASS.getKey(),
C2_AGENT_CLASS.getDefaultValue()))
.agentIdentifier(properties.getProperty(C2_AGENT_IDENTIFIER.getKey()))
-
.fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(),
C2_FULL_HEARTBEAT.getDefaultValue())))
-
.heartbeatPeriod(Long.parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
- C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
- .connectTimeout((long)
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CONNECTION_TIMEOUT.getKey(),
- C2_REST_CONNECTION_TIMEOUT.getDefaultValue()),
TimeUnit.MILLISECONDS))
- .readTimeout((long)
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_READ_TIMEOUT.getKey(),
- C2_REST_READ_TIMEOUT.getDefaultValue()),
TimeUnit.MILLISECONDS))
- .callTimeout((long)
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CALL_TIMEOUT.getKey(),
- C2_REST_CALL_TIMEOUT.getDefaultValue()),
TimeUnit.MILLISECONDS))
-
.maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(),
C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
- .keepAliveDuration((long)
FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(),
- C2_KEEP_ALIVE_DURATION.getDefaultValue()),
TimeUnit.MILLISECONDS))
+
.fullHeartbeat(parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(),
C2_FULL_HEARTBEAT.getDefaultValue())))
+
.heartbeatPeriod(parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
+ .connectTimeout(durationPropertyInMilliSecs(properties,
C2_REST_CONNECTION_TIMEOUT))
+ .readTimeout(durationPropertyInMilliSecs(properties,
C2_REST_READ_TIMEOUT))
+ .callTimeout(durationPropertyInMilliSecs(properties,
C2_REST_CALL_TIMEOUT))
+
.maxIdleConnections(parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(),
C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
+ .keepAliveDuration(durationPropertyInMilliSecs(properties,
C2_KEEP_ALIVE_DURATION))
.httpHeaders(properties.getProperty(C2_REST_HTTP_HEADERS.getKey(),
C2_REST_HTTP_HEADERS.getDefaultValue()))
.c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(),
C2_REQUEST_COMPRESSION.getDefaultValue()))
.c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(),
C2_ASSET_DIRECTORY.getDefaultValue()))
@@ -207,9 +196,14 @@ public class C2NifiClientService {
.c2RestPathBase(properties.getProperty(C2_REST_PATH_BASE.getKey(),
C2_REST_PATH_BASE.getDefaultValue()))
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(),
C2_REST_PATH_HEARTBEAT.getDefaultValue()))
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(),
C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
+
.bootstrapAcknowledgeTimeout(durationPropertyInMilliSecs(properties,
C2_BOOTSTRAP_ACKNOWLEDGE_TIMEOUT))
.build();
}
+ private long durationPropertyInMilliSecs(NiFiProperties properties,
MiNiFiProperties property) {
+ return (long)
getPreciseTimeDuration(properties.getProperty(property.getKey(),
property.getDefaultValue()), MILLISECONDS);
+ }
+
private C2OperationHandlerProvider
c2OperationHandlerProvider(NiFiProperties niFiProperties, FlowController
flowController, FlowService flowService,
FlowIdHolder
flowIdHolder, C2HttpClient client, C2HeartbeatFactory heartbeatFactory,
String
bootstrapConfigFileLocation, String c2AssetDirectory) {
@@ -240,107 +234,29 @@ public class C2NifiClientService {
}
public void start() {
- handleOngoingOperations();
- heartbeatExecutorService.scheduleAtFixedRate(() ->
c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY,
heartbeatPeriod, TimeUnit.MILLISECONDS);
- }
-
- // need to be synchronized to prevent parallel run coming from
acknowledgeHandler/ackTimeoutTask
- private synchronized void handleOngoingOperations() {
- Optional<OperationQueue> operationQueue = requestedOperationDAO.load();
- LOGGER.info("Handling ongoing operations: {}", operationQueue);
- if (operationQueue.isPresent()) {
- try {
- waitForAcknowledgeFromBootstrap();
-
c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
- } catch (Exception e) {
- LOGGER.error("Failed to process c2 operations queue", e);
- c2ClientService.enableHeartbeat();
- }
- } else {
- c2ClientService.enableHeartbeat();
- }
- }
-
- private void waitForAcknowledgeFromBootstrap() {
- LOGGER.info("Waiting for ACK signal from Bootstrap");
- int currentWaitTime = 0;
- while (!ackReceived) {
- try {
- Thread.sleep(IS_ACK_RECEIVED_POLL_INTERVAL);
- } catch (InterruptedException e) {
- LOGGER.warn("Thread interrupted while waiting for
Acknowledge");
- }
- currentWaitTime += IS_ACK_RECEIVED_POLL_INTERVAL;
- if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
- LOGGER.warn("Max wait time ({}) exceeded for waiting ack from
bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
- break;
- }
- }
+ operationManagerExecutorService.execute(c2OperationManager);
+ LOGGER.debug("Scheduling heartbeats with {} ms periodicity",
heartbeatPeriod);
+
heartbeatManagerExecutorService.scheduleAtFixedRate(c2HeartbeatManager,
INITIAL_HEARTBEAT_DELAY_MS, heartbeatPeriod, MILLISECONDS);
}
- private void registerOperation(C2Operation c2Operation) {
+ public void stop() {
+ heartbeatManagerExecutorService.shutdown();
try {
- ackReceived = false;
- registerAcknowledgeTimeoutTask(c2Operation);
- String command = ofNullable(c2Operation.getOperand())
- .map(operand -> c2Operation.getOperation().name() + "_" +
operand.name())
- .orElse(c2Operation.getOperation().name());
- bootstrapCommunicator.sendCommand(command);
- } catch (IOException e) {
- LOGGER.error("Failed to send operation to bootstrap", e);
- throw new UncheckedIOException(e);
- }
- }
-
- private void registerAcknowledgeTimeoutTask(C2Operation c2Operation) {
- bootstrapAcknowledgeExecutorService.schedule(() -> {
- if (!ackReceived) {
- LOGGER.info("Operation requiring restart is failed, and no
restart/acknowledge is happened after {} seconds for {}. Handling remaining
operations.",
- MINIFI_RESTART_TIMEOUT_SECONDS, c2Operation);
- handleOngoingOperations();
+ if
(!heartbeatManagerExecutorService.awaitTermination(TERMINATION_WAIT,
MILLISECONDS)) {
+ heartbeatManagerExecutorService.shutdownNow();
}
- }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- private void acknowledgeHandler(String[] params) {
- LOGGER.info("Received acknowledge message from bootstrap process");
- if (params.length < 1) {
- LOGGER.error("Invalid arguments coming from bootstrap, skipping
acknowledging latest operation");
- return;
- }
-
- Optional<OperationQueue> optionalOperationQueue =
requestedOperationDAO.load();
- ackReceived = true;
- if (optionalOperationQueue.isPresent()) {
- OperationQueue operationQueue = optionalOperationQueue.get();
- C2Operation c2Operation = operationQueue.getCurrentOperation();
- C2OperationAck c2OperationAck = new C2OperationAck();
- c2OperationAck.setOperationId(c2Operation.getIdentifier());
- C2OperationState c2OperationState = new C2OperationState();
- MiNiFiCommandState miNiFiCommandState =
MiNiFiCommandState.valueOf(params[0]);
- OperationState state = OPERATION_STATE_MAP.get(miNiFiCommandState);
- c2OperationState.setState(state);
- c2OperationAck.setOperationState(c2OperationState);
- c2ClientService.sendAcknowledge(c2OperationAck);
- if (MiNiFiCommandState.NO_OPERATION == miNiFiCommandState ||
MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART == miNiFiCommandState) {
- LOGGER.debug("No restart happened because of an error / the
app was already in the desired state");
- handleOngoingOperations();
- }
- } else {
- LOGGER.error("Can not send acknowledge due to empty Operation
Queue");
+ } catch (InterruptedException ignore) {
+ LOGGER.info("Stopping C2 heartbeat executor service was
interrupted, forcing shutdown");
+ heartbeatManagerExecutorService.shutdownNow();
}
- }
-
- public void stop() {
- bootstrapAcknowledgeExecutorService.shutdownNow();
- heartbeatExecutorService.shutdown();
+ operationManagerExecutorService.shutdown();
try {
- if (!heartbeatExecutorService.awaitTermination(TERMINATION_WAIT,
TimeUnit.MILLISECONDS)) {
- heartbeatExecutorService.shutdownNow();
+ if
(!operationManagerExecutorService.awaitTermination(TERMINATION_WAIT,
MILLISECONDS)) {
+ operationManagerExecutorService.shutdownNow();
}
} catch (InterruptedException ignore) {
- LOGGER.info("Stopping C2 Client's thread was interrupted but
shutting down anyway the C2NifiClientService");
- heartbeatExecutorService.shutdownNow();
+ LOGGER.info("Stopping C2 operation executor service was
interrupted, forcing shutdown");
+ operationManagerExecutorService.shutdownNow();
}
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java
similarity index 87%
rename from
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java
index 80b1418c95..72f4c55bbd 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAO.java
@@ -17,22 +17,25 @@
package org.apache.nifi.minifi.c2;
+import static org.slf4j.LoggerFactory.getLogger;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.util.Optional;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
-import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
+public class FileBasedOperationQueueDAO implements OperationQueueDAO {
+
+ private static final Logger LOGGER =
getLogger(FileBasedOperationQueueDAO.class);
+
protected static final String REQUESTED_OPERATIONS_FILE_NAME =
"requestedOperations.data";
private final ObjectMapper objectMapper;
private final File requestedOperationsFile;
- public FileBasedRequestedOperationDAO(String runDir, ObjectMapper
objectMapper) {
+ public FileBasedOperationQueueDAO(String runDir, ObjectMapper
objectMapper) {
this.requestedOperationsFile = new File(runDir,
REQUESTED_OPERATIONS_FILE_NAME);
this.objectMapper = objectMapper;
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java
new file mode 100644
index 0000000000..c78155fa9a
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/BootstrapC2OperationRestartHandlerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static java.lang.Thread.sleep;
+import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
+import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
+import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static
org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.OperationType.START;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.bootstrap.CommandResult;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class BootstrapC2OperationRestartHandlerTest {
+
+ @Test
+ void shouldReturnNotAppliedWhenBootstrapCommunicatorReturnsFalse() throws
IOException {
+ C2Operation inputOperation = new C2Operation();
+ inputOperation.setOperation(START);
+ BootstrapCommunicator bootstrapCommunicator =
mock(BootstrapCommunicator.class);
+
when(bootstrapCommunicator.sendCommand(START.name())).thenReturn(FAILURE);
+ long bootstrapAcknowledgeTimeoutMs = 0;
+
+ BootstrapC2OperationRestartHandler testHandler = new
BootstrapC2OperationRestartHandler(bootstrapCommunicator,
bootstrapAcknowledgeTimeoutMs);
+ Optional<OperationState> result =
testHandler.handleRestart(inputOperation);
+
+ assertTrue(result.isPresent());
+ assertEquals(NOT_APPLIED, result.get());
+ }
+
+ @Test
+ void shouldReturnNotAppliedWhenBootstrapCommunicatorThrowsException()
throws IOException {
+ C2Operation inputOperation = new C2Operation();
+ inputOperation.setOperation(START);
+ BootstrapCommunicator bootstrapCommunicator =
mock(BootstrapCommunicator.class);
+ when(bootstrapCommunicator.sendCommand(START.name())).thenThrow(new
IOException());
+ long bootstrapAcknowledgeTimeoutMs = 0;
+
+ BootstrapC2OperationRestartHandler testHandler = new
BootstrapC2OperationRestartHandler(bootstrapCommunicator,
bootstrapAcknowledgeTimeoutMs);
+ Optional<OperationState> result =
testHandler.handleRestart(inputOperation);
+
+ assertTrue(result.isPresent());
+ assertEquals(NOT_APPLIED, result.get());
+ }
+
+ @Test
+ void shouldReturnStateAcknowledgedByBootstrapCommunicator() {
+ C2Operation inputOperation = new C2Operation();
+ inputOperation.setOperation(START);
+ long bootstrapAcknowledgeTimeoutMs = 1000;
+ long waitBeforeAcknowledgeMs = 100;
+ String[] callbackResult = new String[] {FULLY_APPLIED.name()};
+ BootstrapCommunicatorStub bootstrapCommunicator = new
BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
+
+ BootstrapC2OperationRestartHandler testHandler = new
BootstrapC2OperationRestartHandler(bootstrapCommunicator,
bootstrapAcknowledgeTimeoutMs);
+ try (ExecutorService executorService =
newVirtualThreadPerTaskExecutor()) {
+ executorService.execute(bootstrapCommunicator);
+ Optional<OperationState> result =
testHandler.handleRestart(inputOperation);
+
+ assertTrue(result.isPresent());
+ assertEquals(FULLY_APPLIED, result.get());
+ }
+ }
+
+ @Test
+ void shouldReturnNotAppliedWhenBootstrapAcknowledgeTimesOut() {
+ C2Operation inputOperation = new C2Operation();
+ inputOperation.setOperation(START);
+ String[] callbackResult = new String[] {FULLY_APPLIED.name()};
+ long bootstrapAcknowledgeTimeoutMs = 1000;
+ long waitBeforeAcknowledgeMs = 2000;
+ BootstrapCommunicatorStub bootstrapCommunicator = new
BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
+
+ BootstrapC2OperationRestartHandler testHandler = new
BootstrapC2OperationRestartHandler(bootstrapCommunicator,
bootstrapAcknowledgeTimeoutMs);
+ try (ExecutorService executorService =
newVirtualThreadPerTaskExecutor()) {
+ executorService.execute(bootstrapCommunicator);
+ Optional<OperationState> result =
testHandler.handleRestart(inputOperation);
+
+ assertTrue(result.isPresent());
+ assertEquals(NOT_APPLIED, result.get());
+ }
+ }
+
+ @Test
+ void shouldReturnNotAppliedWhenBootstrapSendInvalidResponse() {
+ C2Operation inputOperation = new C2Operation();
+ inputOperation.setOperation(START);
+ String[] callbackResult = new String[] {};
+ long bootstrapAcknowledgeTimeoutMs = 1000;
+ long waitBeforeAcknowledgeMs = 100;
+ BootstrapCommunicatorStub bootstrapCommunicator = new
BootstrapCommunicatorStub(SUCCESS, callbackResult, waitBeforeAcknowledgeMs);
+
+ BootstrapC2OperationRestartHandler testHandler = new
BootstrapC2OperationRestartHandler(bootstrapCommunicator,
bootstrapAcknowledgeTimeoutMs);
+ try (ExecutorService executorService =
newVirtualThreadPerTaskExecutor()) {
+ executorService.execute(bootstrapCommunicator);
+ Optional<OperationState> result =
testHandler.handleRestart(inputOperation);
+
+ assertTrue(result.isPresent());
+ assertEquals(NOT_APPLIED, result.get());
+ }
+ }
+
+ static class BootstrapCommunicatorStub implements BootstrapCommunicator,
Runnable {
+
+ private final CommandResult sendCommandResult;
+ private final String[] callbackResult;
+ private final long waitBeforeAcknowledgeMs;
+ private BiConsumer<String[], OutputStream> handler;
+
+ BootstrapCommunicatorStub(CommandResult sendCommandResult, String[]
callbackResult, long waitBeforeAcknowledgeMs) {
+ this.sendCommandResult = sendCommandResult;
+ this.callbackResult = callbackResult;
+ this.waitBeforeAcknowledgeMs = waitBeforeAcknowledgeMs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sleep(waitBeforeAcknowledgeMs);
+ } catch (InterruptedException ignore) {
+ }
+ handler.accept(callbackResult, null);
+ }
+
+ @Override
+ public CommandResult sendCommand(String command, String... args) {
+ return sendCommandResult;
+ }
+
+ @Override
+ public void registerMessageHandler(String command,
BiConsumer<String[], OutputStream> handler) {
+ this.handler = handler;
+ }
+ }
+
+}
+
+
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java
similarity index 90%
rename from
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java
index 962e98b2e9..b2ae03cd03 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedOperationQueueDAOTest.java
@@ -17,7 +17,7 @@
package org.apache.nifi.minifi.c2;
-import static
org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static
org.apache.nifi.minifi.c2.FileBasedOperationQueueDAO.REQUESTED_OPERATIONS_FILE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import org.apache.nifi.c2.client.service.operation.OperationQueue;
import org.apache.nifi.c2.protocol.api.C2Operation;
@@ -45,7 +46,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-class FileBasedRequestedOperationDAOTest {
+class FileBasedOperationQueueDAOTest {
@Mock
private ObjectMapper objectMapper;
@@ -53,11 +54,11 @@ class FileBasedRequestedOperationDAOTest {
@TempDir
File tmpDir;
- private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+ private FileBasedOperationQueueDAO fileBasedRequestedOperationDAO;
@BeforeEach
void setup() {
- fileBasedRequestedOperationDAO = new
FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+ fileBasedRequestedOperationDAO = new
FileBasedOperationQueueDAO(tmpDir.getAbsolutePath(), objectMapper);
}
@Test
@@ -109,6 +110,6 @@ class FileBasedRequestedOperationDAOTest {
C2Operation currentOperation = new C2Operation();
currentOperation.setIdentifier("id2");
- return new OperationQueue(currentOperation,
Collections.singletonList(c2Operation));
+ return new OperationQueue(currentOperation, List.of(c2Operation));
}
}
\ No newline at end of file
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
index 911ddc44a7..fa6b5db211 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
@@ -14,8 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.minifi.bootstrap;
+import static org.apache.nifi.bootstrap.CommandResult.FAILURE;
+import static org.apache.nifi.bootstrap.CommandResult.SUCCESS;
+
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
@@ -38,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.bootstrap.CommandResult;
import org.apache.nifi.minifi.MiNiFiServer;
import org.apache.nifi.minifi.commons.status.FlowStatusReport;
import org.apache.nifi.minifi.status.StatusRequestException;
@@ -90,29 +95,29 @@ public class BootstrapListener implements
BootstrapCommunicator {
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
- sendCommand("PORT", new String[]{String.valueOf(localPort),
secretKey});
+ sendCommand("PORT", new String[] {String.valueOf(localPort),
secretKey});
}
public void reload() throws IOException {
if (listener != null) {
listener.stop();
}
- sendCommand(RELOAD, new String[]{});
+ sendCommand(RELOAD, new String[] {});
}
public void stop() throws IOException {
if (listener != null) {
listener.stop();
}
- sendCommand(SHUTDOWN, new String[]{});
+ sendCommand(SHUTDOWN, new String[] {});
}
public void sendStartedStatus(boolean status) throws IOException {
logger.debug("Notifying Bootstrap that the status of starting MiNiFi
is {}", status);
- sendCommand(STARTED, new String[]{String.valueOf(status)});
+ sendCommand(STARTED, new String[] {String.valueOf(status)});
}
- public void sendCommand(String command, String[] args) throws IOException {
+ public CommandResult sendCommand(String command, String[] args) throws
IOException {
try (Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", bootstrapPort));
@@ -134,8 +139,10 @@ public class BootstrapListener implements
BootstrapCommunicator {
String response = reader.readLine();
if ("OK".equals(response)) {
logger.info("Successfully initiated communication with
Bootstrap");
+ return SUCCESS;
} else {
logger.error("Failed to communicate with Bootstrap. Bootstrap
may be unable to issue or receive commands from MiNiFi");
+ return FAILURE;
}
}
}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
index 41784233c7..1bfc1563d9 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
@@ -25,11 +25,13 @@ public interface BootstrapCommunicator {
/**
* Sends a command with specific arguments to the bootstrap process
+ *
* @param command the command to send
- * @param args the args to send
+ * @param args the args to send
+ * @return {@link CommandResult} of the command sent to Bootstrap
* @throws IOException exception in case of communication issue
*/
- void sendCommand(String command, String... args) throws IOException;
+ CommandResult sendCommand(String command, String... args) throws
IOException;
/**
* Register a handler for messages coming from bootstrap process
diff --git
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java
similarity index 55%
rename from
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
rename to
nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java
index 1216aa812d..ebb9bde6d3 100644
---
a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/CommandResult.java
@@ -15,31 +15,9 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.client.service.operation;
-
-import java.util.Optional;
-
-/**
- * The purpose of this interface is to be able to persist operations between
restarts.
- */
-public interface RequestedOperationDAO {
-
- /**
- * Persist the given requested operation list
- * @param operationQueue the queue containing the current and remaining
operations
- */
- void save(OperationQueue operationQueue);
-
- /**
- * Returns the saved Operations
- *
- * @return the C2 Operations queue with the actual operation
- */
- Optional<OperationQueue> load();
-
- /**
- * Resets the saved operations
- */
- void cleanup();
+package org.apache.nifi.bootstrap;
+public enum CommandResult {
+ FAILURE,
+ SUCCESS
}