This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 6541eac NIFI-6598 Storing peers into managed-state - Fixed checkstyle
errors. - Added PeerPersistence interface. - Expose RemoteProcessGroup state
via REST API - Made stateManager transient.
6541eac is described below
commit 6541eac625ff1d721867152d728c53483eab822a
Author: Koji Kawamura <[email protected]>
AuthorDate: Tue Aug 27 16:53:02 2019 +0900
NIFI-6598 Storing peers into managed-state
- Fixed checkstyle errors.
- Added PeerPersistence interface.
- Expose RemoteProcessGroup state via REST API
- Made stateManager transient.
This closes #3677.
Signed-off-by: Bryan Bende <[email protected]>
---
.../java/org/apache/nifi/util/NiFiProperties.java | 12 --
.../remote/client/AbstractPeerPersistence.java | 93 +++++++++++++++
.../nifi/remote/client/FilePeerPersistence.java | 55 +++++++++
.../PeerPersistence.java} | 28 +----
.../apache/nifi/remote/client/PeerSelector.java | 100 +++++-----------
.../nifi/remote/client/PeerStatusProvider.java | 7 ++
.../nifi/remote/client/SiteToSiteClient.java | 54 ++++++++-
.../nifi/remote/client/SiteToSiteClientConfig.java | 11 ++
.../nifi/remote/client/StatePeerPersistence.java | 65 +++++++++++
.../apache/nifi/remote/client/http/HttpClient.java | 8 +-
.../client/socket/EndpointConnectionPool.java | 15 ++-
.../nifi/remote/client/socket/SocketClient.java | 2 +-
.../apache/nifi/remote/util/PeerStatusCache.java | 14 ++-
.../nifi/remote/client/TestPeerSelector.java | 128 +++++++++++++++++++++
.../remote/client/socket/TestSiteToSiteClient.java | 38 +++++-
.../org/apache/nifi/groups/RemoteProcessGroup.java | 3 +
.../nifi/controller/flow/StandardFlowManager.java | 4 +-
.../apache/nifi/groups/StandardProcessGroup.java | 8 ++
.../nifi/remote/StandardRemoteProcessGroup.java | 22 ++--
.../nifi/remote/StandardRemoteGroupPort.java | 9 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 8 ++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 10 ++
.../nifi/web/api/RemoteProcessGroupResource.java | 57 +++++++++
.../org/apache/nifi/web/dao/ComponentStateDAO.java | 10 ++
.../apache/nifi/web/dao/RemoteProcessGroupDAO.java | 10 ++
.../web/dao/impl/StandardComponentStateDAO.java | 6 +
.../dao/impl/StandardRemoteProcessGroupDAO.java | 14 +++
.../src/main/resources/nifi-web-api-context.xml | 1 +
28 files changed, 652 insertions(+), 140 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index afcd268..0315fdf 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -77,7 +77,6 @@ public abstract class NiFiProperties {
public static final String REMOTE_CONTENTS_CACHE_EXPIRATION =
"nifi.remote.contents.cache.expiration";
public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
public static final String ADMINISTRATIVE_YIELD_DURATION =
"nifi.administrative.yield.duration";
- public static final String PERSISTENT_STATE_DIRECTORY =
"nifi.persistent.state.directory";
public static final String BORED_YIELD_DURATION =
"nifi.bored.yield.duration";
public static final String PROCESSOR_SCHEDULING_TIMEOUT =
"nifi.processor.scheduling.timeout";
public static final String BACKPRESSURE_COUNT =
"nifi.queue.backpressure.count";
@@ -272,7 +271,6 @@ public abstract class NiFiProperties {
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30
sec";
- public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY =
"./conf/state";
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY =
"5 mins";
public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
@@ -729,16 +727,6 @@ public abstract class NiFiProperties {
DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT);
}
- public File getPersistentStateDirectory() {
- final String dirName = getProperty(PERSISTENT_STATE_DIRECTORY,
- DEFAULT_PERSISTENT_STATE_DIRECTORY);
- final File file = new File(dirName);
- if (!file.exists()) {
- file.mkdirs();
- }
- return file;
- }
-
// getters for cluster node properties //
public boolean isNode() {
return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
new file mode 100644
index 0000000..758b425
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractPeerPersistence.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public abstract class AbstractPeerPersistence implements PeerPersistence {
+
+ protected Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected PeerStatusCache restorePeerStatuses(final BufferedReader reader,
+ long cachedTimestamp) throws IOException {
+ final SiteToSiteTransportProtocol transportProtocol;
+ try {
+ transportProtocol =
SiteToSiteTransportProtocol.valueOf(reader.readLine());
+ } catch (IllegalArgumentException e) {
+ logger.info("Discard stored peer statuses in {} because transport
protocol is not stored",
+ this.getClass().getSimpleName());
+ return null;
+ }
+
+ final Set<PeerStatus> restoredStatuses = readPeerStatuses(reader);
+
+ if (!restoredStatuses.isEmpty()) {
+ logger.info("Restored peer statuses from {} {}",
this.getClass().getSimpleName(), restoredStatuses);
+ return new PeerStatusCache(restoredStatuses, cachedTimestamp,
transportProtocol);
+ }
+
+ return null;
+ }
+
+ private Set<PeerStatus> readPeerStatuses(final BufferedReader reader)
throws IOException {
+ final Set<PeerStatus> statuses = new HashSet<>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String[] splits = line.split(Pattern.quote(":"));
+ if (splits.length != 3 && splits.length != 4) {
+ continue;
+ }
+
+ final String hostname = splits[0];
+ final int port = Integer.parseInt(splits[1]);
+ final boolean secure = Boolean.parseBoolean(splits[2]);
+
+ final boolean supportQueryForPeer = splits.length == 4 &&
Boolean.parseBoolean(splits[3]);
+
+ statuses.add(new PeerStatus(new PeerDescription(hostname, port,
secure), 1, supportQueryForPeer));
+ }
+
+ return statuses;
+ }
+
+
+ @FunctionalInterface
+ protected interface IOConsumer<T> {
+ void accept(T value) throws IOException;
+ }
+
+ protected void write(final PeerStatusCache peerStatusCache, final
IOConsumer<String> consumer) throws IOException {
+ consumer.accept(peerStatusCache.getTransportProtocol().name() + "\n");
+ for (final PeerStatus status : peerStatusCache.getStatuses()) {
+ final PeerDescription description = status.getPeerDescription();
+ final String line = description.getHostname() + ":" +
description.getPort() + ":" + description.isSecure() + ":" +
status.isQueryForPeers() + "\n";
+ consumer.accept(line);
+ }
+ }
+
+}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
new file mode 100644
index 0000000..8fa167d
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/FilePeerPersistence.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class FilePeerPersistence extends AbstractPeerPersistence {
+
+ private final File persistenceFile;
+
+ public FilePeerPersistence(File persistenceFile) {
+ this.persistenceFile = persistenceFile;
+ }
+
+ @Override
+ public void save(final PeerStatusCache peerStatusCache) throws IOException
{
+ try (final OutputStream fos = new FileOutputStream(persistenceFile);
+ final OutputStream out = new BufferedOutputStream(fos)) {
+ write(peerStatusCache, line ->
out.write(line.getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+
+ @Override
+ public PeerStatusCache restore() throws IOException {
+ try (final InputStream fis = new FileInputStream(persistenceFile);
+ final BufferedReader reader = new BufferedReader(new
InputStreamReader(fis))) {
+ return restorePeerStatuses(reader, persistenceFile.lastModified());
+ }
+ }
+}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
similarity index 56%
copy from
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
copy to
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
index c52b4b7..5698247 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerPersistence.java
@@ -14,31 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.remote.util;
+package org.apache.nifi.remote.client;
-import java.util.Set;
+import org.apache.nifi.remote.util.PeerStatusCache;
-import org.apache.nifi.remote.PeerStatus;
+import java.io.IOException;
-public class PeerStatusCache {
+public interface PeerPersistence {
- private final Set<PeerStatus> statuses;
- private final long timestamp;
+ void save(final PeerStatusCache peerStatusCache) throws IOException;
- public PeerStatusCache(final Set<PeerStatus> statuses) {
- this(statuses, System.currentTimeMillis());
- }
-
- public PeerStatusCache(final Set<PeerStatus> statuses, final long
timestamp) {
- this.statuses = statuses;
- this.timestamp = timestamp;
- }
-
- public Set<PeerStatus> getStatuses() {
- return statuses;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
+ PeerStatusCache restore() throws IOException;
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 14c163b..8235a38 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -21,20 +21,12 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -48,7 +40,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.remote.util.EventReportUtil.error;
@@ -67,7 +58,7 @@ public class PeerSelector {
private volatile long peerRefreshTime = 0L;
private final AtomicLong peerIndex = new AtomicLong(0L);
private volatile PeerStatusCache peerStatusCache;
- private final File persistenceFile;
+ private final PeerPersistence peerPersistence;
private EventReporter eventReporter;
@@ -90,70 +81,41 @@ public class PeerSelector {
this.systemTime = systemTime;
}
- public PeerSelector(final PeerStatusProvider peerStatusProvider, final
File persistenceFile) {
+ public PeerSelector(final PeerStatusProvider peerStatusProvider, final
PeerPersistence peerPersistence) {
this.peerStatusProvider = peerStatusProvider;
- this.persistenceFile = persistenceFile;
- Set<PeerStatus> recoveredStatuses;
- if (persistenceFile != null && persistenceFile.exists()) {
- try {
- recoveredStatuses =
recoverPersistedPeerStatuses(persistenceFile);
- this.peerStatusCache = new PeerStatusCache(recoveredStatuses,
persistenceFile.lastModified());
- } catch (final IOException ioe) {
- logger.warn("Failed to recover peer statuses from {} due to
{}; will continue without loading information from file", persistenceFile, ioe);
- }
- } else {
- peerStatusCache = null;
- }
- }
-
- private void persistPeerStatuses(final Set<PeerStatus> statuses) {
- if (persistenceFile == null) {
- return;
- }
+ this.peerPersistence = peerPersistence;
- try (final OutputStream fos = new FileOutputStream(persistenceFile);
- final OutputStream out = new BufferedOutputStream(fos)) {
-
- for (final PeerStatus status : statuses) {
- final PeerDescription description =
status.getPeerDescription();
- final String line = description.getHostname() + ":" +
description.getPort() + ":" + description.isSecure() + ":" +
status.isQueryForPeers() + "\n";
- out.write(line.getBytes(StandardCharsets.UTF_8));
+ try {
+ PeerStatusCache restoredPeerStatusCache = null;
+ if (peerPersistence != null) {
+ restoredPeerStatusCache = peerPersistence.restore();
+ if (restoredPeerStatusCache != null) {
+ final SiteToSiteTransportProtocol currentProtocol =
peerStatusProvider.getTransportProtocol();
+ final SiteToSiteTransportProtocol cachedProtocol =
restoredPeerStatusCache.getTransportProtocol();
+ if (!currentProtocol.equals(cachedProtocol)) {
+ logger.info("Discard stored peer statuses in {}
because transport protocol has changed from {} to {}",
+ peerPersistence.getClass().getSimpleName(),
cachedProtocol, currentProtocol);
+ restoredPeerStatusCache = null;
+ }
+ }
}
+ this.peerStatusCache = restoredPeerStatusCache;
- } catch (final IOException e) {
- error(logger, eventReporter, "Failed to persist list of Peers due
to {}; if restarted and peer's NCM is down," +
- " may be unable to transfer data until communications with
NCM are restored", e.toString());
- logger.error("", e);
+ } catch (final IOException ioe) {
+ logger.warn("Failed to recover peer statuses from {} due to {};
will continue without loading information from file",
+ peerPersistence.getClass().getSimpleName(), ioe);
}
}
- private static Set<PeerStatus> recoverPersistedPeerStatuses(final File
file) throws IOException {
- if (!file.exists()) {
- return null;
- }
-
- final Set<PeerStatus> statuses = new HashSet<>();
- try (final InputStream fis = new FileInputStream(file);
- final BufferedReader reader = new BufferedReader(new
InputStreamReader(fis))) {
-
- String line;
- while ((line = reader.readLine()) != null) {
- final String[] splits = line.split(Pattern.quote(":"));
- if (splits.length != 3 && splits.length != 4) {
- continue;
- }
-
- final String hostname = splits[0];
- final int port = Integer.parseInt(splits[1]);
- final boolean secure = Boolean.parseBoolean(splits[2]);
-
- final boolean supportQueryForPeer = splits.length == 4 &&
Boolean.parseBoolean(splits[3]);
-
- statuses.add(new PeerStatus(new PeerDescription(hostname,
port, secure), 1, supportQueryForPeer));
- }
+ private void persistPeerStatuses() {
+ try {
+ peerPersistence.save(peerStatusCache);
+ } catch (final IOException e) {
+ error(logger, eventReporter, "Failed to persist list of Peers due
to {}; if restarted" +
+ " and the nodes specified at the RPG are down," +
+ " may be unable to transfer data until communications with
those nodes are restored", e.toString());
+ logger.error("", e);
}
-
- return statuses;
}
List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses,
final TransferDirection direction) {
@@ -340,8 +302,8 @@ public class PeerSelector {
try {
final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
- persistPeerStatuses(statuses);
- peerStatusCache = new PeerStatusCache(statuses);
+ peerStatusCache = new PeerStatusCache(statuses,
System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
+ persistPeerStatuses();
logger.info("{} Successfully refreshed Peer Status; remote
instance consists of {} peers", this, statuses.size());
} catch (Exception e) {
warn(logger, eventReporter, "{} Unable to refresh Remote Group's
peers due to {}", this, e.getMessage());
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
index 817bccf..64fd161 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import java.io.IOException;
import java.util.Set;
@@ -57,4 +58,10 @@ public interface PeerStatusProvider {
*/
Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription
peerDescription) throws IOException;
+ /**
+ * Returns the transport protocol being used.
+ * @return the transport protocol
+ */
+ SiteToSiteTransportProtocol getTransportProtocol();
+
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 29eb465..b805d03 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote.client;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@@ -67,7 +68,7 @@ import java.util.concurrent.TimeUnit;
* interaction with the remote instance takes place. After data has been
* exchanged or it is determined that no data is available, the Transaction can
* then be canceled (via the {@link Transaction#cancel(String)} method) or can
- * be completed (via the {@link Transaction#complete(boolean)} method).
+ * be completed (via the {@link Transaction#complete()} method).
* </p>
*
* <p>
@@ -164,6 +165,7 @@ public interface SiteToSiteClient extends Closeable {
private KeystoreType truststoreType;
private EventReporter eventReporter = EventReporter.NO_OP;
private File peerPersistenceFile;
+ private StateManager stateManager;
private boolean useCompression;
private String portName;
private String portIdentifier;
@@ -482,8 +484,8 @@ public interface SiteToSiteClient extends Closeable {
/**
* Specifies a file that the client can write to in order to persist
the
* list of nodes in the remote cluster and recover the list of nodes
- * upon restart. This allows the client to function if the remote
- * Cluster Manager is unavailable, even after a restart of the client
+ * upon restart. This allows the client to function if the remote nodes
+ * specified by the urls are unavailable, even after a restart of the
client
* software. If not specified, the list of nodes will not be persisted
* and a failure of the Cluster Manager will result in not being able
to
* communicate with the remote instance if a new client is created.
@@ -497,6 +499,32 @@ public interface SiteToSiteClient extends Closeable {
}
/**
+ * <p>Specifies StateManager that the client can persist the
+ * list of nodes in the remote cluster and recover the list of nodes
+ * upon restart. This allows the client to function if the remote nodes
+ * specified by the urls are unavailable, even after a restart of the
client
+ * software. If not specified, the list of nodes will not be persisted
+ * and a failure of the Cluster Manager will result in not being able
to
+ * communicate with the remote instance if a new client is created.</p>
+ * <p>Using a StateManager is preferable over using a File to persist
the list of nodes
+ * if the SiteToSiteClient is used by a NiFi component having access
to a NiFi context.
+ * So that the list of nodes can be persisted in the same manner with
other stateful information.</p>
+ * <p>Since StateManager is not serializable, the specified
StateManager
+ * will not be serialized, and a de-serialized SiteToSiteClientConfig
+ * instance will not have StateManager even if the original config has
one.
+ * Use {@link #peerPersistenceFile(File)} instead
+ * if the same SiteToSiteClientConfig needs to be distributed among
multiple
+ * clients via serialization, and also persistent connectivity is
required
+ * in case of having no available remote node specified by the urls
when a client restarts.</p>
+ * @param stateManager state manager
+ * @return the builder
+ */
+ public Builder stateManager(final StateManager stateManager) {
+ this.stateManager = stateManager;
+ return this;
+ }
+
+ /**
* Specifies whether or not data should be compressed before being
* transferred to or from the remote instance.
*
@@ -748,6 +776,7 @@ public interface SiteToSiteClient extends Closeable {
private final KeystoreType truststoreType;
private final EventReporter eventReporter;
private final File peerPersistenceFile;
+ private final transient StateManager stateManager;
private final boolean useCompression;
private final SiteToSiteTransportProtocol transportProtocol;
private final String portName;
@@ -773,6 +802,7 @@ public interface SiteToSiteClient extends Closeable {
this.truststoreType = null;
this.eventReporter = null;
this.peerPersistenceFile = null;
+ this.stateManager = null;
this.useCompression = false;
this.portName = null;
this.portIdentifier = null;
@@ -801,6 +831,7 @@ public interface SiteToSiteClient extends Closeable {
this.truststoreType = builder.truststoreType;
this.eventReporter = builder.eventReporter;
this.peerPersistenceFile = builder.peerPersistenceFile;
+ this.stateManager = builder.stateManager;
this.useCompression = builder.useCompression;
this.portName = builder.portName;
this.portIdentifier = builder.portIdentifier;
@@ -922,6 +953,23 @@ public interface SiteToSiteClient extends Closeable {
}
@Override
+ public StateManager getStateManager() {
+ return stateManager;
+ }
+
+ @Override
+ public PeerPersistence getPeerPersistence() {
+ if (stateManager != null) {
+ return new StatePeerPersistence(stateManager);
+
+ } else if (peerPersistenceFile != null) {
+ return new FilePeerPersistence(peerPersistenceFile);
+ }
+
+ return null;
+ }
+
+ @Override
public EventReporter getEventReporter() {
return eventReporter;
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 8da5e70..604e078 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -111,6 +112,16 @@ public interface SiteToSiteClientConfig extends
Serializable {
File getPeerPersistenceFile();
/**
+ * @return the StateManager to be used for persisting the nodes of a remote
+ */
+ StateManager getStateManager();
+
+ /**
+ * @return a PeerPersistence implementation based on configured persistent
target
+ */
+ PeerPersistence getPeerPersistence();
+
+ /**
* @return a boolean indicating whether or not compression will be used to
* transfer data to and from the remote instance
*/
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
new file mode 100644
index 0000000..185e8fd
--- /dev/null
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/StatePeerPersistence.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.remote.util.PeerStatusCache;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StatePeerPersistence extends AbstractPeerPersistence {
+
+ static final String STATE_KEY_PEERS = "peers";
+ static final String STATE_KEY_TRANSPORT_PROTOCOL = "protocol";
+ static final String STATE_KEY_PEERS_TIMESTAMP = "peers.ts";
+
+ private final StateManager stateManager;
+
+ public StatePeerPersistence(StateManager stateManager) {
+ this.stateManager = stateManager;
+ }
+
+ @Override
+ public void save(final PeerStatusCache peerStatusCache) throws IOException
{
+ final StateMap state = stateManager.getState(Scope.LOCAL);
+ final Map<String, String> stateMap = state.toMap();
+ final Map<String, String> updatedStateMap = new HashMap<>(stateMap);
+ final StringBuilder peers = new StringBuilder();
+ write(peerStatusCache, peers::append);
+ updatedStateMap.put(STATE_KEY_PEERS, peers.toString());
+ updatedStateMap.put(STATE_KEY_PEERS_TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
+ stateManager.setState(updatedStateMap, Scope.LOCAL);
+ }
+
+ @Override
+ public PeerStatusCache restore() throws IOException {
+ final StateMap state = stateManager.getState(Scope.LOCAL);
+ final String storedPeers = state.get(STATE_KEY_PEERS);
+ if (storedPeers != null && !storedPeers.isEmpty()) {
+ try (final BufferedReader reader = new BufferedReader(new
StringReader(storedPeers))) {
+ return restorePeerStatuses(reader,
Long.parseLong(state.get(STATE_KEY_PEERS_TIMESTAMP)));
+ }
+ }
+ return null;
+ }
+}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index e1516d2..690cdfd 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -34,6 +34,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpClientTransaction;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.remote.PeerDTO;
@@ -64,7 +65,7 @@ public class HttpClient extends AbstractSiteToSiteClient
implements PeerStatusPr
public HttpClient(final SiteToSiteClientConfig config) {
super(config);
- peerSelector = new PeerSelector(this, config.getPeerPersistenceFile());
+ peerSelector = new PeerSelector(this, config.getPeerPersistence());
peerSelector.setEventReporter(config.getEventReporter());
taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory()
{
@@ -246,4 +247,9 @@ public class HttpClient extends AbstractSiteToSiteClient
implements PeerStatusPr
transaction.getCommunicant().getCommunicationsSession().interrupt();
}
}
+
+ @Override
+ public SiteToSiteTransportProtocol getTransportProtocol() {
+ return SiteToSiteTransportProtocol.HTTP;
+ }
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 17d66da..53bd963 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -23,6 +23,7 @@ import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.PeerPersistence;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
@@ -36,6 +37,7 @@ import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger;
@@ -44,7 +46,6 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -91,8 +92,9 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
private final InetAddress localAddress;
public EndpointConnectionPool(final RemoteDestination remoteDestination,
final int commsTimeoutMillis, final int idleExpirationMillis,
- final SSLContext sslContext, final EventReporter eventReporter, final
File persistenceFile, final SiteInfoProvider siteInfoProvider,
- final InetAddress localAddress) {
+ final SSLContext sslContext, final
EventReporter eventReporter,
+ final PeerPersistence peerPersistence, final
SiteInfoProvider siteInfoProvider,
+ final InetAddress localAddress) {
Objects.requireNonNull(remoteDestination, "Remote Destination/Port
Identifier cannot be null");
this.remoteDestination = remoteDestination;
@@ -104,7 +106,7 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
this.siteInfoProvider = siteInfoProvider;
- peerSelector = new PeerSelector(this, persistenceFile);
+ peerSelector = new PeerSelector(this, peerPersistence);
peerSelector.setEventReporter(eventReporter);
// Initialize a scheduled executor and run some maintenance tasks in
the background to kill off old, unused
@@ -563,5 +565,8 @@ public class EndpointConnectionPool implements
PeerStatusProvider {
}
}
-
+ @Override
+ public SiteToSiteTransportProtocol getTransportProtocol() {
+ return SiteToSiteTransportProtocol.RAW;
+ }
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 0999d57..ff8e0d6 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -52,7 +52,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
createRemoteDestination(config.getPortIdentifier(),
config.getPortName()),
commsTimeout,
(int)
config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
- config.getSslContext(), config.getEventReporter(),
config.getPeerPersistenceFile(),
+ config.getSslContext(), config.getEventReporter(),
config.getPeerPersistence(),
siteInfoProvider, config.getLocalAddress()
);
diff --git
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
index c52b4b7..acca34e 100644
---
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
+++
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -19,19 +19,19 @@ package org.apache.nifi.remote.util;
import java.util.Set;
import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
public class PeerStatusCache {
private final Set<PeerStatus> statuses;
private final long timestamp;
+ private final SiteToSiteTransportProtocol transportProtocol;
- public PeerStatusCache(final Set<PeerStatus> statuses) {
- this(statuses, System.currentTimeMillis());
- }
-
- public PeerStatusCache(final Set<PeerStatus> statuses, final long
timestamp) {
+ public PeerStatusCache(final Set<PeerStatus> statuses, final long
timestamp,
+ final SiteToSiteTransportProtocol
transportProtocol) {
this.statuses = statuses;
this.timestamp = timestamp;
+ this.transportProtocol = transportProtocol;
}
public Set<PeerStatus> getStatuses() {
@@ -41,4 +41,8 @@ public class PeerStatusCache {
public long getTimestamp() {
return timestamp;
}
+
+ public SiteToSiteTransportProtocol getTransportProtocol() {
+ return transportProtocol;
+ }
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index e29efd8..72dd9a6 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -16,21 +16,34 @@
*/
package org.apache.nifi.remote.client;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
@@ -40,8 +53,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
public class TestPeerSelector {
@@ -255,4 +271,116 @@ public class TestPeerSelector {
assert(!peers.isEmpty());
assertEquals("Node1 should be returned since node 1 is the only
available node.", bootstrapNode, peers.get(0).getPeerDescription());
}
+
+ @Test
+ public void testPeerStatusManagedCache() throws Exception {
+ final PeerStatusProvider peerStatusProvider =
Mockito.mock(PeerStatusProvider.class);
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ final StateMap stateMap = Mockito.mock(StateMap.class);
+ final Map<String, String> state = new HashMap<>();
+ state.put(StatePeerPersistence.STATE_KEY_PEERS,
"RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
+ state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
+
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+ when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
+ when(stateMap.get(anyString())).thenAnswer(invocation ->
state.get(invocation.getArgument(0)));
+ doAnswer(invocation -> {
+ final Map<String, String> updatedMap = invocation.getArgument(0);
+ state.clear();
+ state.putAll(updatedMap);
+ return null;
+ }).when(stateManager).setState(any(), eq(Scope.LOCAL));
+
+ final PeerDescription bootstrapPeer = new PeerDescription("nifi0",
8081, false);
+
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+ when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+ .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1,
true)));
+
+ // PeerSelector should restore peer statuses from managed cache.
+ PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new
StatePeerPersistence(stateManager));
+ peerSelector.refreshPeers();
+ assertEquals("Restored peers should be used",
+ "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n",
stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+
+ // If the stored state is too old, PeerSelector refreshes peers.
+ state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP,
String.valueOf(System.currentTimeMillis() - 120_000));
+ peerSelector = new PeerSelector(peerStatusProvider, new
StatePeerPersistence(stateManager));
+ peerSelector.refreshPeers();
+ assertEquals("Peers should be refreshed",
+ "RAW\nnifi0:8081:false:true\n",
stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+ }
+
+ @Test
+ public void testPeerStatusManagedCacheDifferentProtocol() throws Exception
{
+ final PeerStatusProvider peerStatusProvider =
Mockito.mock(PeerStatusProvider.class);
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ final StateMap stateMap = Mockito.mock(StateMap.class);
+ final Map<String, String> state = new HashMap<>();
+ state.put(StatePeerPersistence.STATE_KEY_PEERS,
"RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n");
+ state.put(StatePeerPersistence.STATE_KEY_PEERS_TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
+
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
+ when(stateManager.getState(eq(Scope.LOCAL))).thenReturn(stateMap);
+ when(stateMap.get(anyString())).thenAnswer(invocation ->
state.get(invocation.getArgument(0)));
+ doAnswer(invocation -> {
+ final Map<String, String> updatedMap = invocation.getArgument(0);
+ state.clear();
+ state.putAll(updatedMap);
+ return null;
+ }).when(stateManager).setState(any(), eq(Scope.LOCAL));
+
+ final PeerDescription bootstrapPeer = new PeerDescription("nifi0",
8081, false);
+
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+ when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+ .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1,
true)));
+
+ // PeerSelector should NOT restore peer statuses from managed cache
because protocol changed.
+ PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new
StatePeerPersistence(stateManager));
+ peerSelector.refreshPeers();
+ assertEquals("Restored peers should NOT be used",
+ "HTTP\nnifi0:8081:false:true\n",
stateMap.get(StatePeerPersistence.STATE_KEY_PEERS));
+ }
+
+ @Test
+ public void testPeerStatusFileCache() throws Exception {
+ final PeerStatusProvider peerStatusProvider =
Mockito.mock(PeerStatusProvider.class);
+
+ final PeerDescription bootstrapPeer = new PeerDescription("nifi0",
8081, false);
+
when(peerStatusProvider.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
+
when(peerStatusProvider.getBootstrapPeerDescription()).thenReturn(bootstrapPeer);
+ when(peerStatusProvider.fetchRemotePeerStatuses(eq(bootstrapPeer)))
+ .thenReturn(Collections.singleton(new PeerStatus(bootstrapPeer, 1,
true)));
+
+ final File file = File.createTempFile("peers", "txt");
+ file.deleteOnExit();
+
+ try (final FileOutputStream fos = new FileOutputStream(file)) {
+
fos.write("RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n".getBytes(StandardCharsets.UTF_8));
+ }
+
+ final Supplier<String> readFile = () -> {
+ try (final FileInputStream fin = new FileInputStream(file);
+ final BufferedReader reader = new BufferedReader(new
InputStreamReader(fin))) {
+ final StringBuilder lines = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lines.append(line).append("\n");
+ }
+ return lines.toString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ // PeerSelector should restore peer statuses from managed cache.
+ PeerSelector peerSelector = new PeerSelector(peerStatusProvider, new
FilePeerPersistence(file));
+ peerSelector.refreshPeers();
+ assertEquals("Restored peers should be used",
+ "RAW\nnifi1:8081:false:true\nnifi2:8081:false:true\n",
readFile.get());
+
+ // If the stored state is too old, PeerSelector refreshes peers.
+ file.setLastModified(System.currentTimeMillis() - 120_000);
+ peerSelector = new PeerSelector(peerStatusProvider, new
FilePeerPersistence(file));
+ peerSelector.refreshPeers();
+ assertEquals("Peers should be refreshed",
+ "RAW\nnifi0:8081:false:true\n", readFile.get());
+ }
}
diff --git
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index c0b5e83..7c70a7a 100644
---
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -19,19 +19,21 @@ package org.apache.nifi.remote.client.socket;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
@@ -137,6 +139,40 @@ public class TestSiteToSiteClient {
}
@Test
+ public void testSerializationWithStateManager() {
+ final StateManager stateManager = Mockito.mock(StateManager.class);
+ final SiteToSiteClientConfig clientConfig = new
SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("input")
+ .stateManager(stateManager)
+ .buildConfig();
+
+ final Kryo kryo = new Kryo();
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Output output = new Output(out);
+
+ try {
+ kryo.writeObject(output, clientConfig);
+ } finally {
+ output.close();
+ }
+
+ final ByteArrayInputStream in = new
ByteArrayInputStream(out.toByteArray());
+ final Input input = new Input(in);
+
+ try {
+ SiteToSiteClientConfig clientConfig2 = kryo.readObject(input,
SiteToSiteClient.StandardSiteToSiteClientConfig.class);
+ Assert.assertEquals(clientConfig.getUrls(),
clientConfig2.getUrls());
+ // Serialization works, but the state manager is not serialized.
+ Assert.assertNotNull(clientConfig.getStateManager());
+ Assert.assertNull(clientConfig2.getStateManager());
+ } finally {
+ input.close();
+ }
+ }
+
+ @Test
public void testGetUrlBackwardCompatibility() {
final Set<String> urls = new LinkedHashSet<>();
urls.add("http://node1:8080/nifi");
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 39be045..f9c1021 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -19,6 +19,7 @@ package org.apache.nifi.groups;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.VersionedComponent;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
@@ -240,4 +241,6 @@ public interface RemoteProcessGroup extends
ComponentAuthorizable, Positionable,
void verifyCanStopTransmitting();
void verifyCanUpdate();
+
+ StateManager getStateManager();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index a4fe16a..fd74d54 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -211,7 +211,9 @@ public class StandardFlowManager implements FlowManager {
}
public RemoteProcessGroup createRemoteProcessGroup(final String id, final
String uris) {
- return new StandardRemoteProcessGroup(requireNonNull(id), uris, null,
processScheduler, bulletinRepository, sslContext, nifiProperties);
+ return new StandardRemoteProcessGroup(requireNonNull(id), uris, null,
+ processScheduler, bulletinRepository, sslContext, nifiProperties,
+ flowController.getStateManagerProvider().getStateManager(id));
}
public void setRootGroup(final ProcessGroup rootGroup) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 5256552..82c51ab 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -853,6 +853,14 @@ public final class StandardProcessGroup implements
ProcessGroup {
remoteGroup.getInputPorts().forEach(scheduler::onPortRemoved);
remoteGroup.getOutputPorts().forEach(scheduler::onPortRemoved);
+ final StateManagerProvider stateManagerProvider =
flowController.getStateManagerProvider();
+ scheduler.submitFrameworkTask(new Runnable() {
+ @Override
+ public void run() {
+
stateManagerProvider.onComponentRemoved(remoteGroup.getIdentifier());
+ }
+ });
+
remoteGroups.remove(remoteGroupId);
LOG.info("{} removed from flow", remoteProcessGroup);
} finally {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 48e2127..8514b5d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,7 +18,6 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
-import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
@@ -54,6 +53,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
@@ -100,6 +100,7 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
private final ProcessScheduler scheduler;
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
+ private final StateManager stateManager;
private final long remoteContentsCacheExpiration;
private volatile boolean initialized = false;
@@ -146,8 +147,10 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String
targetUris, final ProcessGroup processGroup, final ProcessScheduler
processScheduler,
- final BulletinRepository
bulletinRepository, final SSLContext sslContext, final NiFiProperties
nifiProperties) {
+ final BulletinRepository
bulletinRepository, final SSLContext sslContext, final NiFiProperties
nifiProperties,
+ final StateManager stateManager) {
this.nifiProperties = nifiProperties;
+ this.stateManager = stateManager;
this.id = requireNonNull(id);
this.targetUris = targetUris;
@@ -234,11 +237,6 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
@Override
public void onRemove() {
backgroundThreadExecutor.shutdown();
-
- final File file = getPeerPersistenceFile();
- if (file.exists() && !file.delete()) {
- logger.warn("Failed to remove {}. This file should be removed
manually.", file);
- }
}
@Override
@@ -1366,11 +1364,6 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
}
- private File getPeerPersistenceFile() {
- final File stateDir = nifiProperties.getPersistentStateDirectory();
- return new File(stateDir, getIdentifier() + ".peers");
- }
-
@Override
public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());
@@ -1393,4 +1386,9 @@ public class StandardRemoteProcessGroup implements
RemoteProcessGroup {
}
}
}
+
+ @Override
+ public StateManager getStateManager() {
+ return stateManager;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3b4d630..bd2687e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -42,7 +42,6 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket;
@@ -56,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -125,11 +123,6 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
this.targetId = targetId;
}
- private static File getPeerPersistenceFile(final String portId, final
NiFiProperties nifiProperties, final SiteToSiteTransportProtocol
transportProtocol) {
- final File stateDir = nifiProperties.getPersistentStateDirectory();
- return new File(stateDir, String.format("%s_%s.peers", portId,
transportProtocol.name()));
- }
-
@Override
public boolean isTargetRunning() {
return targetRunning.get();
@@ -181,7 +174,7 @@ public class StandardRemoteGroupPort extends
RemoteGroupPort {
.sslContext(sslContext)
.useCompression(isUseCompression())
.eventReporter(remoteGroup.getEventReporter())
- .peerPersistenceFile(getPeerPersistenceFile(getIdentifier(),
nifiProperties, remoteGroup.getTransportProtocol()))
+ .stateManager(remoteGroup.getStateManager())
.nodePenalizationPeriod(penalizationMillis,
TimeUnit.MILLISECONDS)
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol())
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 46fcd80..86ec526 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -1675,6 +1675,14 @@ public interface NiFiServiceFacade {
*/
void clearReportingTaskState(String reportingTaskId);
+ /**
+ * Gets the state for the specified RemoteProcessGroup.
+ *
+ * @param remoteProcessGroupId the RemoteProcessGroup id
+ * @return the component state
+ */
+ ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId);
+
// ----------------------------------------
// Label methods
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 6adc662..9c96cc5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -1537,6 +1537,16 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
@Override
+ public ComponentStateDTO getRemoteProcessGroupState(String
remoteProcessGroupId) {
+ final StateMap clusterState = isClustered() ?
remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.CLUSTER) : null;
+ final StateMap localState =
remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.LOCAL);
+
+ // processor will be non null as it was already found when getting the
state
+ final RemoteProcessGroup remoteProcessGroup =
remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
+ return dtoFactory.createComponentStateDTO(remoteProcessGroupId,
remoteProcessGroup.getClass(), localState, clusterState);
+ }
+
+ @Override
public ConnectionEntity deleteConnection(final Revision revision, final
String connectionId) {
final Connection connection =
connectionDAO.getConnection(connectionId);
final PermissionsDTO permissions =
dtoFactory.createPermissionsDto(connection);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
index f034082..575e01b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java
@@ -30,10 +30,12 @@ import
org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
@@ -827,6 +829,61 @@ public class RemoteProcessGroupResource extends
ApplicationResource {
);
}
+ /**
+ * Gets the state for a RemoteProcessGroup.
+ *
+ * @param id The id of the RemoteProcessGroup
+ * @return a componentStateEntity
+ * @throws InterruptedException if interrupted
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{id}/state")
+ @ApiOperation(
+ value = "Gets the state for a RemoteProcessGroup",
+ response = ComponentStateEntity.class,
+ authorizations = {
+ @Authorization(value = "Write - /remote-process-groups/{uuid}")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete
the request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to
make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could
not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi
was not in the appropriate state to process it. Retrying the same request later
may be successful.")
+ }
+ )
+ public Response getState(
+ @ApiParam(
+ value = "The processor id.",
+ required = true
+ )
+ @PathParam("id") final String id) throws InterruptedException {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable authorizable = lookup.getRemoteProcessGroup(id);
+ authorizable.authorize(authorizer, RequestAction.WRITE,
NiFiUserUtils.getNiFiUser());
+ });
+
+ // get the component state
+ final ComponentStateDTO state =
serviceFacade.getRemoteProcessGroupState(id);
+
+ // generate the response entity
+ final ComponentStateEntity entity = new ComponentStateEntity();
+ entity.setComponentState(state);
+
+ // generate the response
+ return generateOkResponse(entity).build();
+ }
+
private RemoteProcessGroupDTO createDTOWithDesiredRunStatus(final String
id, final RemotePortRunStatusEntity entity) {
final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
dto.setId(id);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
index 636addb..adb50af 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ComponentStateDAO.java
@@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.RemoteProcessGroup;
public interface ComponentStateDAO {
@@ -71,4 +72,13 @@ public interface ComponentStateDAO {
* @param reportingTask reporting task
*/
void clearState(ReportingTaskNode reportingTask);
+
+ /**
+ * Gets the state map for the specified RemoteProcessGroup.
+ *
+ * @param remoteProcessGroup RemoteProcessGroup
+ * @param scope scope
+ * @return state map
+ */
+ StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope scope);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
index 2542185..7446a34 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/RemoteProcessGroupDAO.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.dao;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
@@ -120,4 +122,12 @@ public interface RemoteProcessGroupDAO {
* @param remoteProcessGroupId The remote process group id
*/
void deleteRemoteProcessGroup(String remoteProcessGroupId);
+
+ /**
+ * Gets the specified RemoteProcessGroupId.
+ *
+ * @param remoteProcessGroupId RemoteProcessGroupId id
+ * @return state map
+ */
+ StateMap getState(String remoteProcessGroupId, Scope scope);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index f0a9094..c48186b 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.dao.ComponentStateDAO;
@@ -90,6 +91,11 @@ public class StandardComponentStateDAO implements
ComponentStateDAO {
clearState(reportingTask.getIdentifier());
}
+ @Override
+ public StateMap getState(RemoteProcessGroup remoteProcessGroup, Scope
scope) {
+ return getState(remoteProcessGroup.getIdentifier(), scope);
+ }
+
/* setters */
public void setStateManagerProvider(StateManagerProvider
stateManagerProvider) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index b8399f7..274b5e4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -17,6 +17,8 @@
package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException;
@@ -31,6 +33,7 @@ import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import java.util.ArrayList;
@@ -43,6 +46,7 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
public class StandardRemoteProcessGroupDAO extends ComponentDAO implements
RemoteProcessGroupDAO {
private FlowController flowController;
+ private ComponentStateDAO componentStateDAO;
private RemoteProcessGroup locateRemoteProcessGroup(final String
remoteProcessGroupId) {
final ProcessGroup rootGroup =
flowController.getFlowManager().getRootGroup();
@@ -465,7 +469,17 @@ public class StandardRemoteProcessGroupDAO extends
ComponentDAO implements Remot
remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup);
}
+ @Override
+ public StateMap getState(String remoteProcessGroupId, Scope scope) {
+ final RemoteProcessGroup remoteProcessGroup =
locateRemoteProcessGroup(remoteProcessGroupId);
+ return componentStateDAO.getState(remoteProcessGroup, scope);
+ }
+
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
+
+ public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
+ this.componentStateDAO = componentStateDAO;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 72ffc2c..56b8d4d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -82,6 +82,7 @@
</bean>
<bean id="remoteProcessGroupDAO"
class="org.apache.nifi.web.dao.impl.StandardRemoteProcessGroupDAO">
<property name="flowController" ref="flowController"/>
+ <property name="componentStateDAO" ref="componentStateDAO"/>
</bean>
<bean id="labelDAO" class="org.apache.nifi.web.dao.impl.StandardLabelDAO">
<property name="flowController" ref="flowController"/>