This is an automated email from the ASF dual-hosted git repository.
bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 86acde2fe2 NIFI-10750 Move C2NiFiClientService to minifi-framework
86acde2fe2 is described below
commit 86acde2fe279253b8f69c82c907f5ef85afb3854
Author: Ferenc Erdei <[email protected]>
AuthorDate: Wed Nov 2 17:04:55 2022 +0100
NIFI-10750 Move C2NiFiClientService to minifi-framework
Signed-off-by: Csaba Bejan <[email protected]>
This closes #6612.
---
minifi/minifi-assembly/pom.xml | 4 -
.../java/org/apache/nifi/minifi/MiNiFiServer.java | 2 +
.../nifi/minifi/status/StatusRequestException.java | 2 +-
.../minifi-framework/minifi-framework-core/pom.xml | 5 +
.../apache/nifi/minifi}/c2/C2NiFiProperties.java | 2 +-
.../nifi/minifi}/c2/C2NifiClientService.java | 7 +-
.../c2/command/TransferDebugCommandHelper.java | 2 +-
.../c2/command/UpdateAssetCommandHelper.java | 2 +-
.../c2/command/TransferDebugCommandHelperTest.java | 2 +-
.../c2/command/UpdateAssetCommandHelperTest.java | 2 +-
.../org/apache/nifi/minifi/BootstrapListener.java | 438 ---------------------
.../main/java/org/apache/nifi/minifi/MiNiFi.java | 74 +---
.../nifi/minifi/bootstrap/BootstrapListener.java | 300 ++++++++++++++
.../nifi/minifi/bootstrap/BootstrapRequest.java} | 24 +-
.../minifi/bootstrap/BootstrapRequestReader.java | 71 ++++
.../org/apache/nifi/minifi/bootstrap/DumpUtil.java | 125 ++++++
.../apache/nifi/minifi/StandardMiNiFiServer.java | 86 +++-
.../nifi/bootstrap/BootstrapCommunicator.java | 34 +-
.../nifi-framework/nifi-framework-core/pom.xml | 5 -
.../nifi/controller/StandardFlowService.java | 20 -
.../apache/nifi/headless/HeadlessNiFiServer.java | 14 +-
21 files changed, 651 insertions(+), 570 deletions(-)
diff --git a/minifi/minifi-assembly/pom.xml b/minifi/minifi-assembly/pom.xml
index ff8f205b1c..186ab4ddbb 100644
--- a/minifi/minifi-assembly/pom.xml
+++ b/minifi/minifi-assembly/pom.xml
@@ -121,10 +121,6 @@ limitations under the License.
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-framework-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>c2-client-api</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-framework-nar</artifactId>
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
index ea6f5a7a76..316605f263 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
@@ -25,4 +25,6 @@ import org.apache.nifi.minifi.status.StatusRequestException;
*/
public interface MiNiFiServer extends NiFiServer {
FlowStatusReport getStatusReport(String requestString) throws
StatusRequestException;
+
+ void stop(boolean reload);
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
index 2a418ede56..de5aebc734 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.minifi.status;
-public class StatusRequestException extends Exception {
+public class StatusRequestException extends RuntimeException {
private static final long serialVersionUID = 1L;
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
index 602c15a1d9..dd59840d50 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
@@ -31,6 +31,11 @@ limitations under the License.
<artifactId>minifi-framework-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>c2-client-service</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
similarity index 99%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
index 840da2219d..71282c0ed5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.c2;
+package org.apache.nifi.minifi.c2;
import java.util.concurrent.TimeUnit;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
similarity index 98%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 795b3b4323..e280386e90 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.c2;
+package org.apache.nifi.minifi.c2;
import static java.util.Optional.ofNullable;
@@ -46,8 +46,8 @@ import
org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import
org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
import
org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
-import org.apache.nifi.c2.command.TransferDebugCommandHelper;
-import org.apache.nifi.c2.command.UpdateAssetCommandHelper;
+import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
+import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
@@ -88,6 +88,7 @@ public class C2NifiClientService {
private final RuntimeManifestService runtimeManifestService;
private final SupportedOperationsProvider supportedOperationsProvider;
+
private final long heartbeatPeriod;
public C2NifiClientService(final NiFiProperties niFiProperties, final
FlowController flowController) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/TransferDebugCommandHelper.java
similarity index 98%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/TransferDebugCommandHelper.java
index a92517e6f9..00b923958b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/TransferDebugCommandHelper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.command;
+package org.apache.nifi.minifi.c2.command;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.collectingAndThen;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdateAssetCommandHelper.java
similarity index 98%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdateAssetCommandHelper.java
index 2231dd481b..8c36d5f328 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdateAssetCommandHelper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.command;
+package org.apache.nifi.minifi.c2.command;
import java.io.IOException;
import java.io.UncheckedIOException;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/TransferDebugCommandHelperTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/TransferDebugCommandHelperTest.java
similarity index 98%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/TransferDebugCommandHelperTest.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/TransferDebugCommandHelperTest.java
index 2dfbb68002..3b156fcb2b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/TransferDebugCommandHelperTest.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/TransferDebugCommandHelperTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.command;
+package org.apache.nifi.minifi.c2.command;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdateAssetCommandHelperTest.java
similarity index 99%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java
rename to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdateAssetCommandHelperTest.java
index ffa78c505f..7968b92eb0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdateAssetCommandHelperTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.c2.command;
+package org.apache.nifi.minifi.c2.command;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
deleted file mode 100644
index 3466ea26c1..0000000000
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.lang.management.LockInfo;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MonitorInfo;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.nifi.minifi.commons.status.FlowStatusReport;
-import org.apache.nifi.minifi.status.StatusRequestException;
-import org.apache.nifi.util.LimitingInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BootstrapListener {
-
- private static final Logger logger =
LoggerFactory.getLogger(org.apache.nifi.BootstrapListener.class);
-
- private final MiNiFi minifi;
- private final int bootstrapPort;
- private final String secretKey;
- private final ObjectMapper objectMapper;
-
- private volatile Listener listener;
-
- public BootstrapListener(final MiNiFi minifi, final int bootstrapPort) {
- this.minifi = minifi;
- this.bootstrapPort = bootstrapPort;
- secretKey = UUID.randomUUID().toString();
-
- objectMapper = new ObjectMapper();
- objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- }
-
- public void start() throws IOException {
- logger.debug("Starting Bootstrap Listener to communicate with
Bootstrap Port {}", bootstrapPort);
-
- ServerSocket serverSocket = new ServerSocket();
- serverSocket.bind(new InetSocketAddress("localhost", 0));
- serverSocket.setSoTimeout(2000);
-
- final int localPort = serverSocket.getLocalPort();
- logger.info("Started Bootstrap Listener, Listening for incoming
requests on port {}", localPort);
-
- listener = new Listener(serverSocket);
- final Thread listenThread = new Thread(listener);
- listenThread.setDaemon(true);
- listenThread.setName("Listen to Bootstrap");
- listenThread.start();
-
- logger.debug("Notifying Bootstrap that local port is {}", localPort);
- sendCommand("PORT", new String[]{String.valueOf(localPort),
secretKey});
- }
-
- public void reload() throws IOException {
- if (listener != null) {
- listener.stop();
- }
- sendCommand("RELOAD", new String[]{});
- }
-
- public void stop() throws IOException {
- if (listener != null) {
- listener.stop();
- }
- sendCommand("SHUTDOWN", new String[]{});
- }
-
- public void sendStartedStatus(boolean status) throws IOException {
- logger.debug("Notifying Bootstrap that the status of starting MiNiFi
is {}", status);
- sendCommand("STARTED", new String[]{String.valueOf(status)});
- }
-
- private void sendCommand(final String command, final String[] args) throws
IOException {
- try (final Socket socket = new Socket()) {
- socket.setSoTimeout(60000);
- socket.connect(new InetSocketAddress("localhost", bootstrapPort));
- socket.setSoTimeout(60000);
-
- final StringBuilder commandBuilder = new StringBuilder(command);
- for (final String arg : args) {
- commandBuilder.append(" ").append(arg);
- }
- commandBuilder.append("\n");
-
- final String commandWithArgs = commandBuilder.toString();
- logger.debug("Sending command to Bootstrap: " + commandWithArgs);
-
- final OutputStream out = socket.getOutputStream();
- out.write((commandWithArgs).getBytes(StandardCharsets.UTF_8));
- out.flush();
-
- logger.debug("Awaiting response from Bootstrap...");
- final BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()));
- final String response = reader.readLine();
- if ("OK".equals(response)) {
- logger.info("Successfully initiated communication with
Bootstrap");
- } else {
- logger.error("Failed to communicate with Bootstrap. Bootstrap
may be unable to issue or receive commands from MiNiFi");
- }
- }
- }
-
- private class Listener implements Runnable {
-
- private final ServerSocket serverSocket;
- private final ExecutorService executor;
- private volatile boolean stopped = false;
-
- public Listener(final ServerSocket serverSocket) {
- this.serverSocket = serverSocket;
- this.executor = Executors.newFixedThreadPool(2);
- }
-
- public void stop() {
- stopped = true;
-
- executor.shutdownNow();
-
- try {
- serverSocket.close();
- } catch (final IOException ioe) {
- // nothing to really do here. we could log this, but it would
just become
- // confusing in the logs, as we're shutting down and there's
no real benefit
- }
- }
-
- @Override
- public void run() {
- while (!stopped) {
- try {
- final Socket socket;
- try {
- logger.debug("Listening for Bootstrap Requests");
- socket = serverSocket.accept();
- } catch (final SocketTimeoutException ste) {
- if (stopped) {
- return;
- }
-
- continue;
- } catch (final IOException ioe) {
- if (stopped) {
- return;
- }
-
- throw ioe;
- }
-
- logger.debug("Received connection from Bootstrap");
- socket.setSoTimeout(5000);
-
- executor.submit(() -> {
- try {
- final BootstrapRequest request =
readRequest(socket.getInputStream());
- final BootstrapRequest.RequestType requestType =
request.getRequestType();
-
- switch (requestType) {
- case PING:
- logger.debug("Received PING request from
Bootstrap; responding");
- echoPing(socket.getOutputStream());
- logger.debug("Responded to PING request
from Bootstrap");
- break;
- case RELOAD:
- logger.info("Received RELOAD request from
Bootstrap");
- echoReload(socket.getOutputStream());
- minifi.shutdownHook(true);
- return;
- case SHUTDOWN:
- logger.info("Received SHUTDOWN request
from Bootstrap");
- echoShutdown(socket.getOutputStream());
- minifi.shutdownHook(false);
- return;
- case DUMP:
- logger.info("Received DUMP request from
Bootstrap");
- writeDump(socket.getOutputStream());
- break;
- case FLOW_STATUS_REPORT:
- logger.info("Received FLOW_STATUS_REPORT
request from Bootstrap");
- String flowStatusRequestString =
request.getArgs()[0];
- writeStatusReport(flowStatusRequestString,
socket.getOutputStream());
- break;
- case ENV:
- logger.info("Received ENV request from
Bootstrap");
- writeEnv(socket.getOutputStream());
- break;
- }
- } catch (final Throwable t) {
- logger.error("Failed to process request from
Bootstrap due to " + t.toString(), t);
- } finally {
- try {
- socket.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close socket to
Bootstrap due to {}", ioe.toString());
- }
- }
- });
- } catch (final Throwable t) {
- logger.error("Failed to process request from Bootstrap due
to " + t.toString(), t);
- }
- }
- }
- }
-
- private void writeStatusReport(String flowStatusRequestString, final
OutputStream out) throws IOException, StatusRequestException {
- FlowStatusReport flowStatusReport =
minifi.getMinifiServer().getStatusReport(flowStatusRequestString);
- objectMapper.writeValue(out, flowStatusReport);
- }
-
- private static void writeEnv(OutputStream out) throws IOException {
- try (BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out))) {
- StringBuilder sb = new StringBuilder();
-
- System.getProperties()
- .entrySet()
- .stream()
- .forEach(entry ->
sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n"));
-
- writer.write(sb.toString());
- writer.flush();
- }
- }
-
- private static void writeDump(final OutputStream out) throws IOException {
- final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
- final BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out));
-
- final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
- final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
- final long[] monitorDeadlockThreadIds =
mbean.findMonitorDeadlockedThreads();
-
- final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
- sortedInfos.addAll(Arrays.asList(infos));
- sortedInfos.sort(Comparator.comparing(o ->
o.getThreadName().toLowerCase()));
-
- final StringBuilder sb = new StringBuilder();
- for (final ThreadInfo info : sortedInfos) {
- sb.append("\n");
- sb.append("\"").append(info.getThreadName()).append("\" Id=");
- sb.append(info.getThreadId()).append(" ");
- sb.append(info.getThreadState().toString()).append(" ");
-
- switch (info.getThreadState()) {
- case BLOCKED:
- case TIMED_WAITING:
- case WAITING:
- sb.append(" on ");
- sb.append(info.getLockInfo());
- break;
- default:
- break;
- }
-
- if (info.isSuspended()) {
- sb.append(" (suspended)");
- }
- if (info.isInNative()) {
- sb.append(" (in native code)");
- }
-
- if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0)
{
- for (final long id : deadlockedThreadIds) {
- if (id == info.getThreadId()) {
- sb.append(" ** DEADLOCKED THREAD **");
- }
- }
- }
-
- if (monitorDeadlockThreadIds != null &&
monitorDeadlockThreadIds.length > 0) {
- for (final long id : monitorDeadlockThreadIds) {
- if (id == info.getThreadId()) {
- sb.append(" ** MONITOR-DEADLOCKED THREAD **");
- }
- }
- }
-
- final StackTraceElement[] stackTraces = info.getStackTrace();
- for (final StackTraceElement element : stackTraces) {
- sb.append("\n\tat ").append(element);
-
- final MonitorInfo[] monitors = info.getLockedMonitors();
- for (final MonitorInfo monitor : monitors) {
- if (monitor.getLockedStackFrame().equals(element)) {
- sb.append("\n\t- waiting on ").append(monitor);
- }
- }
- }
-
- final LockInfo[] lockInfos = info.getLockedSynchronizers();
- if (lockInfos.length > 0) {
- sb.append("\n\t");
- sb.append("Number of Locked Synchronizers:
").append(lockInfos.length);
- for (final LockInfo lockInfo : lockInfos) {
- sb.append("\n\t- ").append(lockInfo.toString());
- }
- }
-
- sb.append("\n");
- }
-
- if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
- sb.append("\n\nDEADLOCK DETECTED!");
- sb.append("\nThe following thread IDs are deadlocked:");
- for (final long id : deadlockedThreadIds) {
- sb.append("\n").append(id);
- }
- }
-
- if (monitorDeadlockThreadIds != null &&
monitorDeadlockThreadIds.length > 0) {
- sb.append("\n\nMONITOR DEADLOCK DETECTED!");
- sb.append("\nThe following thread IDs are deadlocked:");
- for (final long id : monitorDeadlockThreadIds) {
- sb.append("\n").append(id);
- }
- }
-
- writer.write(sb.toString());
- writer.flush();
- }
-
- private void echoPing(final OutputStream out) throws IOException {
- out.write("PING\n".getBytes(StandardCharsets.UTF_8));
- out.flush();
- }
-
- private void echoShutdown(final OutputStream out) throws IOException {
- out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
- out.flush();
- }
-
- private void echoReload(final OutputStream out) throws IOException {
- out.write("RELOAD\n".getBytes(StandardCharsets.UTF_8));
- out.flush();
- }
-
- @SuppressWarnings("resource") // we don't want to close the stream, as
the caller will do that
- private BootstrapRequest readRequest(final InputStream in) throws
IOException {
- // We want to ensure that we don't try to read data from an
InputStream directly
- // by a BufferedReader because any user on the system could open a
socket and send
- // a multi-gigabyte file without any new lines in order to crash the
NiFi instance
- // (or at least cause OutOfMemoryErrors, which can wreak havoc on the
running instance).
- // So we will limit the Input Stream to only 4 KB, which should be
plenty for any request.
- final LimitingInputStream limitingIn = new LimitingInputStream(in,
4096);
- final BufferedReader reader = new BufferedReader(new
InputStreamReader(limitingIn));
-
- final String line = reader.readLine();
- final String[] splits = line.split(" ");
- if (splits.length < 1) {
- throw new IOException("Received invalid request from Bootstrap: "
+ line);
- }
-
- final String requestType = splits[0];
- final String[] args;
- if (splits.length == 1) {
- throw new IOException("Received invalid request from Bootstrap;
request did not have a secret key; request type = " + requestType);
- } else if (splits.length == 2) {
- args = new String[0];
- } else {
- args = Arrays.copyOfRange(splits, 2, splits.length);
- }
-
- final String requestKey = splits[1];
- if (!secretKey.equals(requestKey)) {
- throw new IOException("Received invalid Secret Key for request
type " + requestType);
- }
-
- try {
- return new BootstrapRequest(requestType, args);
- } catch (final Exception e) {
- throw new IOException("Received invalid request from Bootstrap;
request type = " + requestType);
- }
- }
-
- private static class BootstrapRequest {
-
- public enum RequestType {
- RELOAD,
- SHUTDOWN,
- DUMP,
- PING,
- FLOW_STATUS_REPORT,
- ENV
- }
-
- private final RequestType requestType;
- private final String[] args;
-
- public BootstrapRequest(final String request, final String[] args) {
- this.requestType = RequestType.valueOf(request);
- this.args = args;
- }
-
- public RequestType getRequestType() {
- return requestType;
- }
-
- @SuppressWarnings("unused")
- public String[] getArgs() {
- return args;
- }
- }
-}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
index 2849838fbc..c34cd8d109 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
@@ -16,24 +16,8 @@
*/
package org.apache.nifi.minifi;
-import org.apache.nifi.NiFiServer;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.headless.FlowEnrichmentException;
-import org.apache.nifi.nar.ExtensionMapping;
-import org.apache.nifi.nar.NarClassLoaders;
-import org.apache.nifi.nar.NarClassLoadersHolder;
-import org.apache.nifi.nar.NarUnpacker;
-import org.apache.nifi.nar.SystemBundle;
-import org.apache.nifi.nar.NarUnpackMode;
-import org.apache.nifi.util.FileUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.bridge.SLF4JBridgeHandler;
-
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
@@ -45,28 +29,35 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.nifi.NiFiServer;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.nar.ExtensionMapping;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.nar.NarUnpackMode;
+import org.apache.nifi.nar.NarUnpacker;
+import org.apache.nifi.nar.SystemBundle;
+import org.apache.nifi.util.FileUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
public class MiNiFi {
private static final Logger logger = LoggerFactory.getLogger(MiNiFi.class);
private final MiNiFiServer minifiServer;
- private final BootstrapListener bootstrapListener;
-
- public static final String BOOTSTRAP_PORT_PROPERTY =
"nifi.bootstrap.listen.port";
private volatile boolean shutdown = false;
private static final String FRAMEWORK_NAR_ID = "minifi-framework-nar";
public MiNiFi(final NiFiProperties properties)
- throws ClassNotFoundException, IOException, NoSuchMethodException,
InstantiationException,
- IllegalAccessException, IllegalArgumentException,
InvocationTargetException, FlowEnrichmentException {
+ throws ClassNotFoundException, IOException,
IllegalArgumentException {
this(properties, ClassLoader.getSystemClassLoader());
}
- public MiNiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
- throws ClassNotFoundException, IOException, NoSuchMethodException,
InstantiationException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException,
- FlowEnrichmentException {
+ public MiNiFi(final NiFiProperties properties, ClassLoader
rootClassLoader) throws ClassNotFoundException, IOException,
IllegalArgumentException {
// There can only be one krb5.conf for the overall Java process so set
this globally during
// start up so that processors and our Kerberos authentication code
don't have to set this
@@ -88,25 +79,6 @@ public class MiNiFi {
shutdownHook(false);
}));
- final String bootstrapPort =
System.getProperty(BOOTSTRAP_PORT_PROPERTY);
- if (bootstrapPort != null) {
- try {
- final int port = Integer.parseInt(bootstrapPort);
-
- if (port < 1 || port > 65535) {
- throw new RuntimeException("Failed to start MiNiFi because
system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the
range 1 - 65535");
- }
-
- bootstrapListener = new BootstrapListener(this, port);
- bootstrapListener.start();
- } catch (final NumberFormatException nfe) {
- throw new RuntimeException("Failed to start MiNiFi because
system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the
range 1 - 65535");
- }
- } else {
- logger.info("MiNiFi started without Bootstrap Port information
provided; will not listen for requests from Bootstrap");
- bootstrapListener = null;
- }
-
// delete the web working dir - if the application does not start
successfully
// the web app directories might be in an invalid state. when this
happens
// jetty will not attempt to re-extract the war into the directory. by
removing
@@ -164,10 +136,6 @@ public class MiNiFi {
} else {
minifiServer.start();
- if (bootstrapListener != null) {
- bootstrapListener.sendStartedStatus(true);
- }
-
final long endTime = System.nanoTime();
final long durationNanos = endTime - startTime;
// Convert to millis for higher precision and then convert to a
float representation of seconds
@@ -184,13 +152,7 @@ public class MiNiFi {
if (minifiServer != null) {
minifiServer.stop();
}
- if (bootstrapListener != null) {
- if (isReload) {
- bootstrapListener.reload();
- } else {
- bootstrapListener.stop();
- }
- }
+
logger.info("MiNiFi server shutdown completed (nicely or
otherwise).");
} catch (final Throwable t) {
logger.warn("Problem occurred ensuring MiNiFi server was properly
terminated due to " + t);
@@ -251,10 +213,6 @@ public class MiNiFi {
timer.schedule(timerTask, 60000L);
}
- MiNiFiServer getMinifiServer() {
- return minifiServer;
- }
-
/**
* Main entry point of the application.
*
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
new file mode 100644
index 0000000000..575645d855
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapListener.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
+import org.apache.nifi.minifi.MiNiFiServer;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.apache.nifi.minifi.status.StatusRequestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BootstrapListener implements BootstrapCommunicator {
+
+ private static final Logger logger =
LoggerFactory.getLogger(BootstrapListener.class);
+ private static final String RELOAD = "RELOAD";
+ private static final String SHUTDOWN = "SHUTDOWN";
+ private static final String STARTED = "STARTED";
+ private static final int LISTENER_EXECUTOR_THREAD_COUNT = 2;
+
+ private final MiNiFiServer minifiServer;
+ private final BootstrapRequestReader bootstrapRequestReader;
+
+ private final int bootstrapPort;
+ private final String secretKey;
+ private final ObjectMapper objectMapper;
+
+ private Listener listener;
+ private final Map<String, BiConsumer<String[], OutputStream>>
messageHandlers = new HashMap<>();
+
+ public BootstrapListener(MiNiFiServer minifiServer, int bootstrapPort) {
+ this.minifiServer = minifiServer;
+ this.bootstrapPort = bootstrapPort;
+ secretKey = UUID.randomUUID().toString();
+ bootstrapRequestReader = new BootstrapRequestReader(secretKey);
+
+ objectMapper = new ObjectMapper();
+ objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ registerHandlers();
+ }
+
+ public void start() throws IOException {
+ logger.debug("Starting Bootstrap Listener to communicate with
Bootstrap Port {}", bootstrapPort);
+
+ ServerSocket serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress("localhost", 0));
+ serverSocket.setSoTimeout(2000);
+
+ int localPort = serverSocket.getLocalPort();
+ logger.info("Started Bootstrap Listener, Listening for incoming
requests on port {}", localPort);
+
+ listener = new Listener(serverSocket);
+ Thread listenThread = new Thread(listener);
+ listenThread.setDaemon(true);
+ listenThread.setName("Listen to Bootstrap");
+ listenThread.start();
+
+ logger.debug("Notifying Bootstrap that local port is {}", localPort);
+ sendCommand("PORT", new String[]{String.valueOf(localPort),
secretKey});
+ }
+
+ public void reload() throws IOException {
+ if (listener != null) {
+ listener.stop();
+ }
+ sendCommand(RELOAD, new String[]{});
+ }
+
+ public void stop() throws IOException {
+ if (listener != null) {
+ listener.stop();
+ }
+ sendCommand(SHUTDOWN, new String[]{});
+ }
+
+ public void sendStartedStatus(boolean status) throws IOException {
+ logger.debug("Notifying Bootstrap that the status of starting MiNiFi
is {}", status);
+ sendCommand(STARTED, new String[]{String.valueOf(status)});
+ }
+
+ public void sendCommand(String command, String[] args) throws IOException {
+ try (Socket socket = new Socket()) {
+ socket.setSoTimeout(60000);
+ socket.connect(new InetSocketAddress("localhost", bootstrapPort));
+
+ StringBuilder commandBuilder = new StringBuilder(command);
+
+ Arrays.stream(args).forEach(arg -> commandBuilder.append("
").append(arg));
+ commandBuilder.append("\n");
+
+ String commandWithArgs = commandBuilder.toString();
+ logger.debug("Sending command to Bootstrap: {}", commandWithArgs);
+
+ OutputStream out = socket.getOutputStream();
+ out.write((commandWithArgs).getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ logger.debug("Awaiting response from Bootstrap...");
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()));
+ String response = reader.readLine();
+ if ("OK".equals(response)) {
+ logger.info("Successfully initiated communication with
Bootstrap");
+ } else {
+ logger.error("Failed to communicate with Bootstrap. Bootstrap
may be unable to issue or receive commands from MiNiFi");
+ }
+ }
+ }
+
+ @Override
+ public void registerMessageHandler(String command, BiConsumer<String[],
OutputStream> handler) {
+ messageHandlers.putIfAbsent(command, handler);
+ }
+
+ private void registerHandlers() {
+ messageHandlers.putIfAbsent("PING", (args, outputStream) -> {
+ logger.debug("Received PING request from Bootstrap; responding");
+ echoRequestCmd("PING", outputStream);
+ logger.debug("Responded to PING request from Bootstrap");
+ });
+ messageHandlers.putIfAbsent(RELOAD, (args, outputStream) -> {
+ logger.info("Received RELOAD request from Bootstrap");
+ echoRequestCmd(RELOAD, outputStream);
+ minifiServer.stop(true);
+ });
+ messageHandlers.putIfAbsent(SHUTDOWN, (args, outputStream) -> {
+ logger.info("Received SHUTDOWN request from Bootstrap");
+ echoRequestCmd(SHUTDOWN, outputStream);
+ minifiServer.stop(false);
+ });
+ messageHandlers.putIfAbsent("DUMP", (args, outputStream) -> {
+ logger.info("Received DUMP request from Bootstrap");
+ writeDump(outputStream);
+ });
+ messageHandlers.putIfAbsent("FLOW_STATUS_REPORT", (args, outputStream)
-> {
+ logger.info("Received FLOW_STATUS_REPORT request from Bootstrap");
+ String flowStatusRequestString = args[0];
+ writeStatusReport(flowStatusRequestString, outputStream);
+ });
+ messageHandlers.putIfAbsent("ENV", (args, outputStream) -> {
+ logger.info("Received ENV request from Bootstrap");
+ writeEnv(outputStream);
+ });
+ }
+
+ private class Listener implements Runnable {
+
+ private final ServerSocket serverSocket;
+ private final ExecutorService executor;
+ private volatile boolean stopped = false;
+
+ public Listener(ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ this.executor =
Executors.newFixedThreadPool(LISTENER_EXECUTOR_THREAD_COUNT);
+ }
+
+ public void stop() {
+ stopped = true;
+
+ executor.shutdownNow();
+
+ try {
+ serverSocket.close();
+ } catch (IOException ioe) {
+ // nothing to really do here. we could log this, but it would
just become
+ // confusing in the logs, as we're shutting down and there's
no real benefit
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ Socket socket;
+ try {
+ logger.debug("Listening for Bootstrap Requests");
+ socket = serverSocket.accept();
+ } catch (SocketTimeoutException ste) {
+ if (stopped) {
+ return;
+ }
+ continue;
+ } catch (IOException ioe) {
+ if (stopped) {
+ return;
+ }
+ throw ioe;
+ }
+
+ logger.debug("Received connection from Bootstrap");
+ socket.setSoTimeout(5000);
+
+ executor.submit(() -> handleBootstrapRequest(socket));
+ } catch (Throwable t) {
+ logger.error("Failed to process request from Bootstrap due
to " + t, t);
+ }
+ }
+ }
+
+ private void handleBootstrapRequest(Socket socket) {
+ try {
+ BootstrapRequest request =
bootstrapRequestReader.readRequest(socket.getInputStream());
+ String requestType = request.getRequestType();
+
+ BiConsumer<String[], OutputStream> handler =
messageHandlers.get(requestType);
+ if (handler == null) {
+ logger.warn("There is no handler defined for the {}",
requestType);
+ } else {
+ handler.accept(request.getArgs(),
socket.getOutputStream());
+ }
+
+ } catch (Throwable t) {
+ logger.error("Failed to process request from Bootstrap due to
" + t, t);
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException ioe) {
+ logger.warn("Failed to close socket to Bootstrap due to
{}", ioe.toString());
+ }
+ }
+ }
+
+ }
+
+ private void writeStatusReport(String flowStatusRequestString,
OutputStream out) throws StatusRequestException {
+ try {
+ FlowStatusReport flowStatusReport =
minifiServer.getStatusReport(flowStatusRequestString);
+ objectMapper.writeValue(out, flowStatusReport);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static void writeEnv(OutputStream out) {
+ try (BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out))) {
+ StringBuilder sb = new StringBuilder();
+
+ System.getProperties()
+ .forEach((key, value) ->
sb.append(key).append("=").append(value).append("\n"));
+
+ writer.write(sb.toString());
+ writer.flush();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void writeDump(OutputStream out) {
+ try {
+ BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out));
+ writer.write(DumpUtil.getDump());
+ writer.flush();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void echoRequestCmd(String cmd, OutputStream out) {
+ try {
+ out.write((cmd + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapRequest.java
similarity index 66%
copy from
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
copy to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapRequest.java
index ea6f5a7a76..cf0b595a4e 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/MiNiFiServer.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapRequest.java
@@ -14,15 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.minifi;
+package org.apache.nifi.minifi.bootstrap;
-import org.apache.nifi.NiFiServer;
-import org.apache.nifi.minifi.commons.status.FlowStatusReport;
-import org.apache.nifi.minifi.status.StatusRequestException;
+public class BootstrapRequest {
+ private final String requestType;
+ private final String[] args;
-/**
- */
-public interface MiNiFiServer extends NiFiServer {
- FlowStatusReport getStatusReport(String requestString) throws
StatusRequestException;
+ public BootstrapRequest(String request, String[] args) {
+ this.requestType = request;
+ this.args = args;
+ }
+
+ public String getRequestType() {
+ return requestType;
+ }
+
+ public String[] getArgs() {
+ return args;
+ }
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapRequestReader.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapRequestReader.java
new file mode 100644
index 0000000000..89816d6c28
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/BootstrapRequestReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import org.apache.nifi.util.LimitingInputStream;
+
+public class BootstrapRequestReader {
+ private final String secretKey;
+
+ public BootstrapRequestReader(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ // we don't want to close the stream, as the caller will do that
+ public BootstrapRequest readRequest(InputStream in) throws IOException {
+ // We want to ensure that we don't try to read data from an
InputStream directly
+ // by a BufferedReader because any user on the system could open a
socket and send
+ // a multi-gigabyte file without any new lines in order to crash the
MiNiFi instance
+ // (or at least cause OutOfMemoryErrors, which can wreak havoc on the
running instance).
+ // So we will limit the Input Stream to only 4 KB, which should be
plenty for any request.
+ LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(limitingIn));
+
+ String line = reader.readLine();
+ String[] splits = line.split(" ");
+ if (splits.length < 1) {
+ throw new IOException("Received invalid request from Bootstrap: "
+ line);
+ }
+
+ String requestType = splits[0];
+ String[] args;
+ if (splits.length == 1) {
+ throw new IOException("Received invalid request from Bootstrap;
request did not have a secret key; request type = " + requestType);
+ } else if (splits.length == 2) {
+ args = new String[0];
+ } else {
+ args = Arrays.copyOfRange(splits, 2, splits.length);
+ }
+
+ String requestKey = splits[1];
+ if (!secretKey.equals(requestKey)) {
+ throw new IOException("Received invalid Secret Key for request
type " + requestType);
+ }
+
+ try {
+ return new BootstrapRequest(requestType, args);
+ } catch (Exception e) {
+ throw new IOException("Received invalid request from Bootstrap;
request type = " + requestType);
+ }
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/DumpUtil.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/DumpUtil.java
new file mode 100644
index 0000000000..63553ea8fa
--- /dev/null
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/bootstrap/DumpUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+public class DumpUtil {
+
+ public static String getDump() {
+ ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
+ long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
+ long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
+
+ List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
+ sortedInfos.addAll(Arrays.asList(infos));
+ sortedInfos.sort(Comparator.comparing(o ->
o.getThreadName().toLowerCase()));
+
+ StringBuilder sb = new StringBuilder();
+ for (ThreadInfo info : sortedInfos) {
+ sb.append("\n");
+ sb.append("\"").append(info.getThreadName()).append("\" Id=");
+ sb.append(info.getThreadId()).append(" ");
+ sb.append(info.getThreadState().toString()).append(" ");
+
+ switch (info.getThreadState()) {
+ case BLOCKED:
+ case TIMED_WAITING:
+ case WAITING:
+ sb.append(" on ");
+ sb.append(info.getLockInfo());
+ break;
+ default:
+ break;
+ }
+
+ if (info.isSuspended()) {
+ sb.append(" (suspended)");
+ }
+ if (info.isInNative()) {
+ sb.append(" (in native code)");
+ }
+
+ if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0)
{
+ for (long id : deadlockedThreadIds) {
+ if (id == info.getThreadId()) {
+ sb.append(" ** DEADLOCKED THREAD **");
+ }
+ }
+ }
+
+ if (monitorDeadlockThreadIds != null &&
monitorDeadlockThreadIds.length > 0) {
+ for (long id : monitorDeadlockThreadIds) {
+ if (id == info.getThreadId()) {
+ sb.append(" ** MONITOR-DEADLOCKED THREAD **");
+ }
+ }
+ }
+
+ StackTraceElement[] stackTraces = info.getStackTrace();
+ for (StackTraceElement element : stackTraces) {
+ sb.append("\n\tat ").append(element);
+
+ MonitorInfo[] monitors = info.getLockedMonitors();
+ for (MonitorInfo monitor : monitors) {
+ if (monitor.getLockedStackFrame().equals(element)) {
+ sb.append("\n\t- waiting on ").append(monitor);
+ }
+ }
+ }
+
+ LockInfo[] lockInfos = info.getLockedSynchronizers();
+ if (lockInfos.length > 0) {
+ sb.append("\n\t");
+ sb.append("Number of Locked Synchronizes:
").append(lockInfos.length);
+ for (LockInfo lockInfo : lockInfos) {
+ sb.append("\n\t- ").append(lockInfo.toString());
+ }
+ }
+
+ sb.append("\n");
+ }
+
+ if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+ sb.append("\n\nDEADLOCK DETECTED");
+ sb.append("\nThe following thread IDs are deadlocked:");
+ for (long id : deadlockedThreadIds) {
+ sb.append("\n").append(id);
+ }
+ }
+
+ if (monitorDeadlockThreadIds != null &&
monitorDeadlockThreadIds.length > 0) {
+ sb.append("\n\nMONITOR DEADLOCK DETECTED");
+ sb.append("\nThe following thread IDs are deadlocked:");
+ for (long id : monitorDeadlockThreadIds) {
+ sb.append("\n").append(id);
+ }
+ }
+
+ return sb.toString();
+ }
+}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
index 4cd491691c..16aeba7871 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
@@ -16,25 +16,107 @@
*/
package org.apache.nifi.minifi;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import org.apache.nifi.headless.HeadlessNiFiServer;
+import org.apache.nifi.minifi.bootstrap.BootstrapListener;
+import org.apache.nifi.minifi.c2.C2NiFiProperties;
+import org.apache.nifi.minifi.c2.C2NifiClientService;
import org.apache.nifi.minifi.commons.status.FlowStatusReport;
import org.apache.nifi.minifi.status.StatusConfigReporter;
import org.apache.nifi.minifi.status.StatusRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
*
*/
public class StandardMiNiFiServer extends HeadlessNiFiServer implements
MiNiFiServer {
private static final Logger logger =
LoggerFactory.getLogger(StandardMiNiFiServer.class);
+ public static final String BOOTSTRAP_PORT_PROPERTY =
"nifi.bootstrap.listen.port";
+
+ private BootstrapListener bootstrapListener;
+
+ /* A reference to the client service for handling*/
+ private C2NifiClientService c2NifiClientService;
+
public StandardMiNiFiServer() {
super();
}
public FlowStatusReport getStatusReport(String requestString) throws
StatusRequestException {
- return StatusConfigReporter.getStatus(this.flowController,
requestString, logger);
+ return StatusConfigReporter.getStatus(flowController, requestString,
logger);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ initBootstrapListener();
+ initC2();
+
+ sendStartedStatus();
+ }
+
+ @Override
+ public void stop(boolean reload) {
+ super.stop();
+ if (bootstrapListener != null) {
+ try {
+ if (reload) {
+ bootstrapListener.reload();
+ } else {
+ bootstrapListener.stop();
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ if (c2NifiClientService != null) {
+ c2NifiClientService.stop();
+ }
+ }
+
+ private void initC2() {
+ if
(Boolean.parseBoolean(props.getProperty(C2NiFiProperties.C2_ENABLE_KEY,
"false"))) {
+ logger.info("C2 enabled, creating a C2 client instance");
+ c2NifiClientService = new C2NifiClientService(props,
flowController);
+ c2NifiClientService.start();
+ } else {
+ logger.debug("C2 Property [{}] missing or disabled: C2 client not
created", C2NiFiProperties.C2_ENABLE_KEY);
+ c2NifiClientService = null;
+ }
+ }
+
+ private void initBootstrapListener() {
+ String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
+ if (bootstrapPort != null) {
+ try {
+ int port = Integer.parseInt(bootstrapPort);
+
+ if (port < 1 || port > 65535) {
+ throw new RuntimeException("Failed to start MiNiFi because
system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the
range 1 - 65535");
+ }
+
+ bootstrapListener = new BootstrapListener(this, port);
+ bootstrapListener.start();
+ } catch (NumberFormatException | IOException nfe) {
+ throw new RuntimeException("Failed to start MiNiFi because
system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the
range 1 - 65535");
+ }
+ } else {
+ logger.info("MiNiFi started without Bootstrap Port information
provided; will not listen for requests from Bootstrap");
+ bootstrapListener = null;
+ }
+ }
+
+ private void sendStartedStatus() {
+ if (bootstrapListener != null) {
+ try {
+ bootstrapListener.sendStartedStatus(true);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
similarity index 52%
copy from
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
copy to
nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
index 2a418ede56..41784233c7 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-api/src/main/java/org/apache/nifi/minifi/status/StatusRequestException.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/bootstrap/BootstrapCommunicator.java
@@ -14,25 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.minifi.status;
-public class StatusRequestException extends Exception {
+package org.apache.nifi.bootstrap;
- private static final long serialVersionUID = 1L;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.BiConsumer;
- public StatusRequestException() {
- super();
- }
+public interface BootstrapCommunicator {
- public StatusRequestException(final String message) {
- super(message);
- }
+ /**
+ * Sends a command with specific arguments to the bootstrap process
+ * @param command the command to send
+ * @param args the args to send
+ * @throws IOException exception in case of communication issue
+ */
+ void sendCommand(String command, String... args) throws IOException;
- public StatusRequestException(final Throwable t) {
- super(t);
- }
-
- public StatusRequestException(final String message, final Throwable t) {
- super(message, t);
- }
+ /**
+ * Register a handler for messages coming from bootstrap process
+ * @param command the command
+ * @param handler handler for the specific command
+ */
+ void registerMessageHandler(String command, BiConsumer<String[],
OutputStream> handler);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index e730b3893c..a5ca978960 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -184,11 +184,6 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>c2-client-service</artifactId>
- <version>1.19.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index e0d647f521..6a99df2b2c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -21,8 +21,6 @@ import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.AuthorizerCapabilityDetection;
import org.apache.nifi.authorization.ManagedAuthorizer;
import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.c2.C2NiFiProperties;
-import org.apache.nifi.c2.C2NifiClientService;
import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@@ -147,9 +145,6 @@ public class StandardFlowService implements FlowService,
ProtocolHandler {
*/
private NodeIdentifier nodeId;
- /* A reference to the client service for handling*/
- private C2NifiClientService c2NifiClientService;
-
// guardedBy rwLock
private boolean firstControllerInitialization = true;
@@ -295,17 +290,6 @@ public class StandardFlowService implements FlowService,
ProtocolHandler {
if (configuredForClustering) {
senderListener.start();
- } else {
- // If standalone and C2 is enabled, create a C2 client
- final boolean c2Enabled =
Boolean.parseBoolean(nifiProperties.getProperty(C2NiFiProperties.C2_ENABLE_KEY,
"false"));
- if (c2Enabled) {
- logger.info("C2 enabled, creating a C2 client instance");
- c2NifiClientService = new
C2NifiClientService(nifiProperties, this.controller);
- c2NifiClientService.start();
- } else {
- logger.debug("C2 Property [{}] missing or disabled: C2
client not created", C2NiFiProperties.C2_ENABLE_KEY);
- c2NifiClientService = null;
- }
}
} catch (final IOException ioe) {
@@ -331,10 +315,6 @@ public class StandardFlowService implements FlowService,
ProtocolHandler {
running.set(false);
- if (c2NifiClientService != null) {
- c2NifiClientService.stop();
- }
-
if (clusterCoordinator != null) {
try {
clusterCoordinator.shutdown();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index c458546b8f..401767e814 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.headless;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.util.List;
+import java.util.Set;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.impl.StandardAuditService;
@@ -28,7 +32,6 @@ import
org.apache.nifi.authorization.exception.AuthorizationAccessException;
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowSerializationStrategy;
@@ -65,11 +68,6 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedWriter;
-import java.io.OutputStreamWriter;
-import java.util.List;
-import java.util.Set;
-
/**
*
*/
@@ -227,10 +225,6 @@ public class HeadlessNiFiServer implements NiFiServer {
return null;
}
- protected C2Client getC2Client() {
- return null;
- }
-
public void stop() {
try {
flowService.stop(false);