alopresto commented on a change in pull request #4289:
URL: https://github.com/apache/nifi/pull/4289#discussion_r435629323
##########
File path:
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
##########
@@ -16,353 +16,559 @@
*/
package org.apache.nifi.remote.client;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import javax.validation.constraints.NotNull;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.apache.nifi.remote.util.EventReportUtil.error;
-import static org.apache.nifi.remote.util.EventReportUtil.warn;
-
+/**
+ * Service which maintains state around peer (NiFi node(s) in a remote
instance (cluster or
+ * standalone)). There is an internal cache which stores identifying
information about each
+ * node and the current workload of each in number of flowfiles being
processed. Individual
+ * nodes can be penalized for an amount of time (see {@link #penalize(Peer,
long)}) to avoid
+ * sending/receiving data from them. Attempts are made to balance
communications ("busier"
+ * nodes will {@code TransferDirection.SEND} more and {@code
TransferDirection.RECEIVE} fewer
+ * flowfiles from this instance).
+ */
public class PeerSelector {
-
private static final Logger logger =
LoggerFactory.getLogger(PeerSelector.class);
- private static final long PEER_CACHE_MILLIS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
- private static final long PEER_REFRESH_PERIOD = 60000L;
+ // The timeout for the peer status cache
+ private static final long PEER_CACHE_MILLIS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
- private final ReentrantLock peerRefreshLock = new ReentrantLock();
- private volatile List<PeerStatus> peerStatuses;
- private volatile Set<PeerStatus> lastFetchedQueryablePeers;
- private volatile long peerRefreshTime = 0L;
- private final AtomicLong peerIndex = new AtomicLong(0L);
- private volatile PeerStatusCache peerStatusCache;
+ // The service which saves the peer state to persistent storage
private final PeerPersistence peerPersistence;
- private EventReporter eventReporter;
-
+ // The service which retrieves peer state
private final PeerStatusProvider peerStatusProvider;
- private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations
= new ConcurrentHashMap<>();
- static class SystemTime {
- long currentTimeMillis() {
- return System.currentTimeMillis();
- }
- }
- private SystemTime systemTime = new SystemTime();
+ // Maps the peer description to a millisecond penalty expiration
+ private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations
= new ConcurrentHashMap<>();
+
+ // The most recently fetched peer statuses
+ private volatile PeerStatusCache peerStatusCache;
+
+ private EventReporter eventReporter;
/**
- * Replace the SystemTime instance.
- * This method is purely used by unit testing, to emulate peer refresh
period.
+ * Returns a peer selector with the provided collaborators.
+ *
+ * @param peerStatusProvider the service which retrieves peer state
+ * @param peerPersistence the service which persists peer state
*/
- void setSystemTime(final SystemTime systemTime) {
- logger.info("Replacing systemTime instance to {}.", systemTime);
- this.systemTime = systemTime;
- }
-
public PeerSelector(final PeerStatusProvider peerStatusProvider, final
PeerPersistence peerPersistence) {
this.peerStatusProvider = peerStatusProvider;
this.peerPersistence = peerPersistence;
+ // On instantiation, retrieve the peer status cache
+ restoreInitialPeerStatusCache();
+ }
+
+ /**
+ * Populates the peer status cache from the peer persistence provider
(e.g. the file system or
+ * persisted cluster state). If this fails, it will log a warning and
continue, as it is not
+ * required for startup. If the cached protocol differs from the currently
configured protocol,
+ * the cache will be cleared.
+ */
+ private void restoreInitialPeerStatusCache() {
try {
PeerStatusCache restoredPeerStatusCache = null;
if (peerPersistence != null) {
restoredPeerStatusCache = peerPersistence.restore();
+
+ // If there is an existing cache, ensure that the protocol
matches the current protocol
if (restoredPeerStatusCache != null) {
final SiteToSiteTransportProtocol currentProtocol =
peerStatusProvider.getTransportProtocol();
final SiteToSiteTransportProtocol cachedProtocol =
restoredPeerStatusCache.getTransportProtocol();
+
+ // If the protocols have changed, clear the cache
if (!currentProtocol.equals(cachedProtocol)) {
- logger.info("Discard stored peer statuses in {}
because transport protocol has changed from {} to {}",
- peerPersistence.getClass().getSimpleName(),
cachedProtocol, currentProtocol);
+ logger.warn("Discard stored peer statuses in {}
because transport protocol has changed from {} to {}",
+ peerPersistence.getClass().getSimpleName(),
cachedProtocol, currentProtocol);
restoredPeerStatusCache = null;
}
}
}
this.peerStatusCache = restoredPeerStatusCache;
-
} catch (final IOException ioe) {
logger.warn("Failed to recover peer statuses from {} due to {};
will continue without loading information from file",
- peerPersistence.getClass().getSimpleName(), ioe);
+ peerPersistence.getClass().getSimpleName(), ioe);
}
}
- private void persistPeerStatuses() {
- try {
- peerPersistence.save(peerStatusCache);
- } catch (final IOException e) {
- error(logger, eventReporter, "Failed to persist list of Peers due
to {}; if restarted" +
- " and the nodes specified at the RPG are down," +
- " may be unable to transfer data until communications with
those nodes are restored", e.toString());
- logger.error("", e);
+ /**
+ * Returns the normalized weight for this ratio of peer flowfiles to total
flowfiles and the given direction. The number will be
+ * a Double between 0 and 100 indicating the percent of all flowfiles the
peer
+ * should send/receive. The transfer direction is <em>from the perspective
of this node to the peer</em>
+ * (i.e. how many flowfiles should <em>this node send</em> to the peer, or
how many flowfiles
+ * should <em>this node receive</em> from the peer).
+ *
+ * @param direction the transfer direction ({@code SEND} weights
the destinations higher if they have fewer flowfiles, {@code RECEIVE} weights
them higher if they have more)
+ * @param totalFlowFileCount the total flowfile count in the remote
instance (standalone or cluster)
+ * @param flowFileCount the flowfile count for the given peer
+ * @param peerCount the number of peers in the remote instance
+ * @return the normalized weight of this peer
+ */
+ private static double calculateNormalizedWeight(TransferDirection
direction, long totalFlowFileCount, int flowFileCount, int peerCount) {
+ // If there is only a single remote, send/receive all data to/from it
+ if (peerCount == 1) {
+ return 100;
}
- }
-
- List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses,
final TransferDirection direction) {
-
- final int numDestinations = Math.max(128, statuses.size());
- final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
- long totalFlowFileCount = 0L;
- for (final PeerStatus nodeInfo : statuses) {
- totalFlowFileCount += nodeInfo.getFlowFileCount();
+ double cappedPercent;
+ // If no flowfiles exist in the remote instance, evenly weight each
node with 1/N
+ if (totalFlowFileCount == 0) {
+ cappedPercent = 1.0 / peerCount;
+ } else {
+ final double percentageOfFlowFiles = ((double) flowFileCount /
totalFlowFileCount);
+ cappedPercent = percentageOfFlowFiles;
+
+ // If sending to the remote, allocate more flowfiles to the
less-stressed peers
+ if (direction == TransferDirection.SEND) {
+ cappedPercent = (1 - percentageOfFlowFiles) / (peerCount - 1);
+ }
}
+ return new BigDecimal(cappedPercent * 100).setScale(2,
RoundingMode.FLOOR).doubleValue();
+ }
- int totalEntries = 0;
- for (final PeerStatus nodeInfo : statuses) {
- final int flowFileCount = nodeInfo.getFlowFileCount();
- // don't allow any node to get more than 80% of the data
- final double percentageOfFlowFiles = Math.min(0.8D, ((double)
flowFileCount / (double) totalFlowFileCount));
- final double relativeWeighting = (direction ==
TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
- final int entries = Math.max(1, (int) (numDestinations *
relativeWeighting));
-
- entryCountMap.put(nodeInfo, Math.max(1, entries));
- totalEntries += entries;
+ /**
+ * Returns an ordered map of peers sorted in descending order by value
(relative weight).
+ *
+ * @param unsortedMap the unordered map of peers to weights
+ * @return the sorted (desc) map (by value)
+ */
+ private static LinkedHashMap<PeerStatus, Double>
sortMapByWeight(Map<PeerStatus, Double> unsortedMap) {
+ List<Map.Entry<PeerStatus, Double>> list = new
ArrayList<>(unsortedMap.entrySet());
+ list.sort(Map.Entry.comparingByValue());
+
+ LinkedHashMap<PeerStatus, Double> result = new LinkedHashMap<>();
+ for (int i = list.size() - 1; i >= 0; i--) {
+ Map.Entry<PeerStatus, Double> entry = list.get(i);
+ result.put(entry.getKey(), entry.getValue());
}
- final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
- for (int i = 0; i < totalEntries; i++) {
- destinations.add(null);
- }
- for (final Map.Entry<PeerStatus, Integer> entry :
entryCountMap.entrySet()) {
- final PeerStatus nodeInfo = entry.getKey();
- final int numEntries = entry.getValue();
-
- int skipIndex = numEntries;
- for (int i = 0; i < numEntries; i++) {
- int n = (skipIndex * i);
- while (true) {
- final int index = n % destinations.size();
- PeerStatus status = destinations.get(index);
- if (status == null) {
- status = new PeerStatus(nodeInfo.getPeerDescription(),
nodeInfo.getFlowFileCount(), nodeInfo.isQueryForPeers());
- destinations.set(index, status);
- break;
- } else {
- n++;
- }
- }
+ return result;
+ }
+
+ /**
+ * Prints the distribution of the peers to the logger.
+ *
+ * @param sortedPeerWorkloads the peers and relative weights
+ */
+ private static void printDistributionStatistics(Map<PeerStatus, Double>
sortedPeerWorkloads, TransferDirection direction) {
+ if (logger.isDebugEnabled() && sortedPeerWorkloads != null) {
+ DecimalFormat df = new DecimalFormat("##.##");
+ df.setRoundingMode(RoundingMode.FLOOR);
+ final StringBuilder distributionDescription = new StringBuilder();
+ distributionDescription.append("New weighted distribution of
nodes:");
+ for (final Map.Entry<PeerStatus, Double> entry :
sortedPeerWorkloads.entrySet()) {
+ final double percentage = entry.getValue();
+ distributionDescription.append("\n").append(entry.getKey())
+ .append(" will").append(direction ==
TransferDirection.RECEIVE ? " send " : " receive ")
+ .append(df.format(percentage)).append("% of data");
}
+ logger.debug(distributionDescription.toString());
}
+ }
- // Shuffle destinations to provide better distribution.
- // Without this, same host will be used continuously, especially when
remote peers have the same number of queued files.
- // Use Random(0) to provide consistent result for unit testing.
Randomness is not important to shuffle destinations.
- Collections.shuffle(destinations, new Random(0));
+ /**
+ * Returns the total of all values in the map. This method is frequently
used to calculate the total number of
+ * flowfiles in the instance from the respective peer flowfile counts or
the total percentage from the relative weights.
+ *
+ * @param peerWeightMap the map of peers to flowfile counts or relative
weights
+ * @return the total of the map values
+ */
+ private static double sumMapValues(Map<PeerStatus, Double> peerWeightMap) {
+ return
peerWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
+ }
- final StringBuilder distributionDescription = new StringBuilder();
- distributionDescription.append("New Weighted Distribution of Nodes:");
- for (final Map.Entry<PeerStatus, Integer> entry :
entryCountMap.entrySet()) {
- final double percentage = entry.getValue() * 100D /
destinations.size();
-
distributionDescription.append("\n").append(entry.getKey()).append(" will
receive ").append(percentage).append("% of data");
- }
- logger.info(distributionDescription.toString());
+ /**
+ * Resets all penalization states for the peers.
+ */
+ public void clear() {
+ peerPenaltyExpirations.clear();
+ }
+
+ /**
+ * Return status of a peer that will be used for the next communication.
+ * The peers with lower workloads will be selected with higher probability.
+ *
+ * @param direction the amount of workload is calculated based on
transaction direction,
+ * for SEND, a peer with fewer flow files is preferred,
+ * for RECEIVE, a peer with more flow files is preferred
+ * @return a selected peer, if there is no available peer or all peers are
penalized, then return null
+ */
+ public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+ Set<PeerStatus> peerStatuses = getPeerStatuses();
+ Map<PeerStatus, Double> orderedPeerStatuses =
buildWeightedPeerMap(peerStatuses, direction);
+
+ return getAvailablePeerStatus(orderedPeerStatuses);
+ }
- // Jumble the list of destinations.
- return destinations;
+ /**
+ * Returns {@code true} if this peer is currently penalized and should not
send/receive flowfiles.
+ *
+ * @param peerStatus the peer status identifying the peer
+ * @return true if this peer is penalized
+ */
+ public boolean isPenalized(final PeerStatus peerStatus) {
+ final Long expirationEnd =
peerPenaltyExpirations.get(peerStatus.getPeerDescription());
+ return (expirationEnd != null && expirationEnd >
System.currentTimeMillis());
}
/**
* Updates internal state map to penalize a PeerStatus that points to the
- * specified peer
+ * specified peer.
*
- * @param peer the peer
- * @param penalizationMillis period of time to penalize a given peer
+ * @param peer the peer
+ * @param penalizationMillis period of time to penalize a given peer
(relative time, not absolute)
*/
public void penalize(final Peer peer, final long penalizationMillis) {
penalize(peer.getDescription(), penalizationMillis);
}
+ /**
+ * Updates internal state map to penalize a PeerStatus that points to the
+ * specified peer.
+ *
+ * @param peerDescription the peer description (identifies the peer)
+ * @param penalizationMillis period of time to penalize a given peer
(relative time, not absolute)
+ */
public void penalize(final PeerDescription peerDescription, final long
penalizationMillis) {
- Long expiration = peerTimeoutExpirations.get(peerDescription);
+ Long expiration = peerPenaltyExpirations.get(peerDescription);
if (expiration == null) {
- expiration = Long.valueOf(0L);
+ expiration = 0L;
}
- final long newExpiration = Math.max(expiration,
systemTime.currentTimeMillis() + penalizationMillis);
- peerTimeoutExpirations.put(peerDescription,
Long.valueOf(newExpiration));
+ final long newExpiration = Math.max(expiration,
System.currentTimeMillis() + penalizationMillis);
+ peerPenaltyExpirations.put(peerDescription, newExpiration);
}
- public boolean isPenalized(final PeerStatus peerStatus) {
- final Long expirationEnd =
peerTimeoutExpirations.get(peerStatus.getPeerDescription());
- return (expirationEnd != null && expirationEnd >
systemTime.currentTimeMillis());
+ /**
+ * Allows for external callers to trigger a refresh of the internal peer
status cache. Performs the refresh if the cache has expired. If the cache is
still valid, skips the refresh.
+ */
+ public void refresh() {
+ long cacheAgeMs = getCacheAge();
+ logger.debug("External refresh triggered. Last refresh was {} ms ago",
cacheAgeMs);
+ if (isPeerRefreshNeeded()) {
+ logger.debug("Refreshing peer status cache");
+ refreshPeerStatusCache();
+ } else {
+ logger.debug("Cache is still valid; skipping refresh");
+ }
}
- public void clear() {
- peerTimeoutExpirations.clear();
+ /**
+ * Sets the event reporter instance.
+ *
+ * @param eventReporter the event reporter
+ */
+ public void setEventReporter(EventReporter eventReporter) {
+ this.eventReporter = eventReporter;
}
- private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
- return (peerList == null || peerList.isEmpty() ||
systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+ /**
+ * Returns a map of peers prepared for flowfile transfer in the specified
direction. Each peer is a key and the value is a
+ * weighted percentage of the total flowfiles in the remote instance. For
example, in a cluster where the total number of flowfiles
+ * is 100, distributed across three nodes 20 in A, 30 in B, and 50 in C,
the resulting map for
+ * {@code SEND} will be {@code [A:40.0, B:35.0, C:25.0]} (1 - .2 => .8 *
100 / (3-1)) => 40.0).
+ *
+ * @param statuses the set of all peers
+ * @param direction the direction of transfer ({@code SEND} weights the
destinations higher if they have more flowfiles, {@code RECEIVE} weights them
higher if they have fewer)
+ * @return the ordered map of each peer to its relative weight
+ */
+ LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(final
Set<PeerStatus> statuses, final TransferDirection direction) {
+ // Get all the destinations with their relative weights
+ final Map<PeerStatus, Double> peerWorkloads =
createDestinationMap(statuses, direction);
+
+ if (!peerWorkloads.isEmpty()) {
+ // This map is sorted, but not by key, so it cannot use SortedMap
+ LinkedHashMap<PeerStatus, Double> sortedPeerWorkloads =
sortMapByWeight(peerWorkloads);
+
+ // Print the expected distribution of the peers
+ printDistributionStatistics(sortedPeerWorkloads, direction);
+
+ return sortedPeerWorkloads;
+ } else {
+ logger.debug("No peers available");
+ return new LinkedHashMap<>();
+ }
}
/**
- * Return status of a peer that will be used for the next communication.
- * The peer with less workload will be selected with higher probability.
- * @param direction the amount of workload is calculated based on
transaction direction,
- * for SEND, a peer with less flow files is preferred,
- * for RECEIVE, a peer with more flow files is preferred
- * @return a selected peer, if there is no available peer or all peers are
penalized, then return null
+ * Returns a map indexed by a peer to the normalized weight (number of
flowfiles currently being
+ * processed by the peer as a percentage of the total). This is used to
allocate flowfiles to
+ * the various peers as destinations.
+ *
+ * @param peerStatuses the set of peers, along with their current workload
(number of flowfiles)
+ * @param direction whether sending flowfiles to these peers or
receiving them
+ * @return the map of weighted peers
*/
- public PeerStatus getNextPeerStatus(final TransferDirection direction) {
- List<PeerStatus> peerList = peerStatuses;
- if (isPeerRefreshNeeded(peerList)) {
- peerRefreshLock.lock();
+ @NotNull
+ private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus>
peerStatuses, TransferDirection direction) {
+ final Map<PeerStatus, Double> peerWorkloads = new HashMap<>();
+
+ // Calculate the total number of flowfiles in the peers
+ long totalFlowFileCount =
peerStatuses.stream().mapToLong(PeerStatus::getFlowFileCount).sum();
+ logger.debug("Building weighted map of peers with total remote NiFi
flowfile count: {}", totalFlowFileCount);
+
+ // For each node, calculate the relative weight and store it in the map
+ for (final PeerStatus nodeInfo : peerStatuses) {
+ final int flowFileCount = nodeInfo.getFlowFileCount();
+ final double normalizedWeight =
calculateNormalizedWeight(direction, totalFlowFileCount, flowFileCount,
peerStatuses.size());
+ peerWorkloads.put(nodeInfo, normalizedWeight);
+ }
+
+ return peerWorkloads;
+ }
+
+ /**
+ * Returns a set of {@link PeerStatus} objects representing all remote
peers for the provided
+ * {@link PeerDescription}s. If a queried peer returns updated state on a
peer which has already
+ * been captured, the new state is used.
+ * <p>
+ * Example:
+ * <p>
+ * 3 node cluster with nodes A, B, C
+ * <p>
+ * Node A knows about Node B and Node C, B about A and C, etc.
+ *
+ * <pre>
+ * Action | Statuses
+ * query(A) -> B.status, C.status | Bs1, Cs1
+ * query(B) -> A.status, C.status | As1, Bs1, Cs2
+ * query(C) -> A.status, B.status | As2, Bs2, Cs2
+ * </pre>
+ *
+ * @param peersToRequestClusterInfoFrom the set of peers to query
+ * @return the complete set of statuses for each collection of peers
+ * @throws IOException if there is a problem fetching peer statuses
+ */
+ private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription>
peersToRequestClusterInfoFrom) throws IOException {
+ logger.debug("Fetching remote peer statuses from: {}",
peersToRequestClusterInfoFrom);
+ Exception lastFailure = null;
+
+ final Set<PeerStatus> allPeerStatuses = new HashSet<>();
+
+ // Iterate through all peers, getting (sometimes multiple) status(es)
from each
+ for (final PeerDescription peerDescription :
peersToRequestClusterInfoFrom) {
try {
- // now that we have the lock, check again that we need to
refresh (because another thread
- // could have been refreshing while we were waiting for the
lock).
- peerList = peerStatuses;
- if (isPeerRefreshNeeded(peerList)) {
- try {
- peerList = createPeerStatusList(direction);
- } catch (final Exception e) {
- final String message = String.format("%s Failed to
update list of peers due to %s", this, e.toString());
- warn(logger, eventReporter, message);
- if (logger.isDebugEnabled()) {
- logger.warn("", e);
- }
- }
+ // Retrieve the peer status(es) from each peer description
+ final Set<PeerStatus> statusesForPeerDescription =
peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
- this.peerStatuses = peerList;
- peerRefreshTime = systemTime.currentTimeMillis();
- }
- } finally {
- peerRefreshLock.unlock();
+ // Filter to remove any peers which are not queryable
+ final Set<PeerStatus> filteredStatuses =
statusesForPeerDescription.stream()
+ .filter(PeerStatus::isQueryForPeers)
+ .collect(Collectors.toSet());
+
+ allPeerStatuses.addAll(filteredStatuses);
+ } catch (final Exception e) {
+ logger.warn("Could not communicate with {}:{} to determine
which node(s) exist in the remote NiFi instance, due to {}",
+ peerDescription.getHostname(),
peerDescription.getPort(), e.toString());
+ lastFailure = e;
}
}
- if (peerList == null || peerList.isEmpty()) {
+ // If no peers were fetched and an exception was the cause, throw an
exception
+ if (allPeerStatuses.isEmpty() && lastFailure != null) {
+ throw new IOException("Unable to retrieve nodes from remote
instance", lastFailure);
+ }
+
+ return allPeerStatuses;
+ }
+
+ /**
+ * Returns the {@link PeerStatus} identifying the next peer to
send/receive data. This uses random
+ * selection of peers, weighted by the relative desirability (i.e. for
{@code SEND}, peers with more
+ * flowfiles are more likely to be selected, and for {@code RECEIVE},
peers with fewer flowfiles are
+ * more likely).
+ *
+ * @param orderedPeerStatuses the map of peers to relative weights, sorted
in descending order by weight
+ * @return the peer to send/receive data
+ */
+ private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double>
orderedPeerStatuses) {
+ if (orderedPeerStatuses == null || orderedPeerStatuses.isEmpty()) {
+ logger.warn("Available peers collection is empty; no peer
available");
return null;
}
- PeerStatus peerStatus;
- for (int i = 0; i < peerList.size(); i++) {
- final long idx = peerIndex.getAndIncrement();
- final int listIndex = (int) (idx % peerList.size());
- peerStatus = peerList.get(listIndex);
+ final double totalWeights = sumMapValues(orderedPeerStatuses);
+ logger.debug("Determining next available peer ({} peers with total
weight {})", orderedPeerStatuses.keySet().size(), totalWeights);
+
+ final double random = Math.random() * 100;
+ logger.debug("Generated random value {}", random);
+ if (random > totalWeights) {
+ logger.warn("Random selection was outside of the precision of the
weights ({}, {}); allocating to the first available peer", random,
totalWeights);
+ return new ArrayList<>(orderedPeerStatuses.keySet()).get(0);
+ }
- if (isPenalized(peerStatus)) {
- logger.debug("{} {} is penalized; will not communicate with
this peer", this, peerStatus);
- } else {
- return peerStatus;
+ double threshold = 0.0;
+ for (Map.Entry<PeerStatus, Double> e : orderedPeerStatuses.entrySet())
{
+ logger.debug("Initial threshold was {}; added peer value {}; total
{}", threshold, e.getValue(), threshold + e.getValue());
+ threshold += e.getValue();
+ if (random <= threshold) {
+ PeerStatus peerStatus = e.getKey();
+ if (isPenalized(peerStatus)) {
+ logger.debug("{} is penalized; will not communicate with
this peer", peerStatus);
+ } else {
+ return peerStatus;
+ }
}
}
- logger.debug("{} All peers appear to be penalized; returning null",
this);
+ logger.debug("Did not select a peer; r {}, t {}, w {}", random,
threshold, orderedPeerStatuses.values());
+ logger.debug("All peers appear to be penalized; returning null");
return null;
}
- private List<PeerStatus> createPeerStatusList(final TransferDirection
direction) throws IOException {
- Set<PeerStatus> statuses = getPeerStatuses();
- if (statuses == null) {
- refreshPeers();
- statuses = getPeerStatuses();
- if (statuses == null) {
- logger.debug("{} found no peers to connect to", this);
- return Collections.emptyList();
- }
+ /**
+ * Returns the cache age in milliseconds. If the cache is null or not set,
returns {@code -1}.
+ *
+ * @return the cache age in millis
+ */
+ private long getCacheAge() {
+ if (peerStatusCache == null) {
+ return -1;
}
- return formulateDestinationList(statuses, direction);
+ return System.currentTimeMillis() - peerStatusCache.getTimestamp();
}
- private Set<PeerStatus> getPeerStatuses() {
- final PeerStatusCache cache = this.peerStatusCache;
- if (cache == null || cache.getStatuses() == null ||
cache.getStatuses().isEmpty()) {
- return null;
- }
-
- if (cache.getTimestamp() + PEER_CACHE_MILLIS <
systemTime.currentTimeMillis()) {
- final Set<PeerStatus> equalizedSet = new
HashSet<>(cache.getStatuses().size());
- for (final PeerStatus status : cache.getStatuses()) {
- final PeerStatus equalizedStatus = new
PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
- equalizedSet.add(equalizedStatus);
- }
+ /**
+ * Returns the set of queryable peers ({@link
PeerStatus#isQueryForPeers()}) most recently fetched.
+ *
+ * @return the set of queryable peers (empty set if the cache is {@code
null})
+ */
+ @NotNull
+ private Set<PeerStatus> getLastFetchedQueryablePeers() {
+ return peerStatusCache != null ? peerStatusCache.getStatuses() :
Collections.emptySet();
+ }
- return equalizedSet;
+ /**
+ * Returns the set of peer statuses. If the cache is {@code null} or
empty, refreshes the cache first and then returns the new peer status set.
+ *
+ * @return the most recent peer statuses (empty set if the cache is {@code
null})
+ */
+ @NotNull
+ private Set<PeerStatus> getPeerStatuses() {
+ if (isPeerRefreshNeeded()) {
+ refreshPeerStatusCache();
}
- return cache.getStatuses();
+ return getLastFetchedQueryablePeers();
}
- public void refreshPeers() {
- final PeerStatusCache existingCache = peerStatusCache;
- if (existingCache != null && (existingCache.getTimestamp() +
PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) {
- return;
- }
+ /**
+ * Returns the set of {@link PeerDescription} objects uniquely identifying
each NiFi node which should be queried for {@link PeerStatus}.
+ *
+ * @return the set of recently retrieved peers and the bootstrap peer
+ * @throws IOException if there is a problem retrieving the list of peers
to query
+ */
+ private Set<PeerDescription> getPeersToQuery() throws IOException {
+ final Set<PeerDescription> peersToRequestClusterInfoFrom = new
HashSet<>();
- try {
- final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
- peerStatusCache = new PeerStatusCache(statuses,
System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
- persistPeerStatuses();
- logger.info("{} Successfully refreshed Peer Status; remote
instance consists of {} peers", this, statuses.size());
- } catch (Exception e) {
- warn(logger, eventReporter, "{} Unable to refresh Remote Group's
peers due to {}", this, e.getMessage());
- if (logger.isDebugEnabled()) {
- logger.debug("", e);
+ // Use the peers fetched last time
+ final Set<PeerStatus> lastFetched = getLastFetchedQueryablePeers();
+ if (lastFetched != null && !lastFetched.isEmpty()) {
+ for (PeerStatus peerStatus : lastFetched) {
+
peersToRequestClusterInfoFrom.add(peerStatus.getPeerDescription());
}
}
+
+ // Always add the configured node info to the list of peers
+
peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+
+ return peersToRequestClusterInfoFrom;
}
- public void setEventReporter(EventReporter eventReporter) {
- this.eventReporter = eventReporter;
+ /**
+ * Returns {@code true} if this cache has expired.
+ *
+ * @param cache the peer status cache
+ * @return true if the cache is expired
+ */
+ private boolean isCacheExpired(PeerStatusCache cache) {
+ return cache == null || cache.getTimestamp() + PEER_CACHE_MILLIS <
System.currentTimeMillis();
}
- private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
- final Set<PeerDescription> peersToRequestClusterInfoFrom = new
HashSet<>();
+ /**
+ * Returns {@code true} if the internal collection of peers is empty or
the refresh time has passed.
+ *
+ * @return true if the peer statuses should be refreshed
+ */
+ private boolean isPeerRefreshNeeded() {
+ return (peerStatusCache == null || peerStatusCache.isEmpty() ||
isCacheExpired(peerStatusCache));
+ }
- // Look at all of the peers that we fetched last time.
- final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
- if (lastFetched != null && !lastFetched.isEmpty()) {
- lastFetched.stream().map(peer -> peer.getPeerDescription())
- .forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
- }
+ /**
+ * Persists the provided cache instance (in memory and via the {@link
PeerPersistence} (e.g. in cluster state or a local file)) for future retrieval.
+ *
+ * @param peerStatusCache the cache of current peer statuses to persist
+ */
+ private void persistPeerStatuses(PeerStatusCache peerStatusCache) {
+ try {
+ this.peerStatusCache = peerStatusCache;
- // Always add the configured node info to the list of peers to
communicate with
-
peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+ // The #save mechanism persists the cache to stateful or
file-based storage
+ peerPersistence.save(peerStatusCache);
+ } catch (final IOException e) {
+ error(logger, eventReporter, "Failed to persist list of peers due
to {}; if restarted" +
+ " and the nodes specified at the remote instance are
down," +
+ " may be unable to transfer data until communications with
those nodes are restored", e.toString());
+ logger.error("", e);
+ }
+ }
- logger.debug("Fetching remote peer statuses from: {}",
peersToRequestClusterInfoFrom);
- Exception lastFailure = null;
- for (final PeerDescription peerDescription :
peersToRequestClusterInfoFrom) {
- try {
- final Set<PeerStatus> statuses =
peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
- lastFetchedQueryablePeers = statuses.stream()
- .filter(p -> p.isQueryForPeers())
- .collect(Collectors.toSet());
+ /**
+ * Refreshes the list of S2S peers that flowfiles can be sent to or
received from. Uses the stateful
+ * cache to reduce network overhead.
+ */
+ private void refreshPeerStatusCache() {
+ try {
+ // Splitting enumeration and querying into separate methods allows
better testing and composition
+ final Set<PeerDescription> peersToQuery = getPeersToQuery();
+ final Set<PeerStatus> statuses =
fetchRemotePeerStatuses(peersToQuery);
- return statuses;
- } catch (final Exception e) {
- logger.warn("Could not communicate with {}:{} to determine
which nodes exist in the remote NiFi cluster, due to {}",
- peerDescription.getHostname(),
peerDescription.getPort(), e.toString());
- lastFailure = e;
+ if (statuses.isEmpty()) {
+ logger.info("No peers were retrieved from the remote group
{}", peersToQuery.stream().map(p -> p.getHostname() + ":" +
p.getPort()).collect(Collectors.joining(",")));
}
- }
- final IOException ioe = new IOException("Unable to communicate with
remote NiFi cluster in order to determine which nodes exist in the remote
cluster");
- if (lastFailure != null) {
- ioe.addSuppressed(lastFailure);
+ // Persist the fetched peer statuses
+ PeerStatusCache peerStatusCache = new PeerStatusCache(statuses,
System.currentTimeMillis(), peerStatusProvider.getTransportProtocol());
+ persistPeerStatuses(peerStatusCache);
+ logger.info("Successfully refreshed peer status cache; remote
group consists of {} peers", statuses.size());
+ } catch (Exception e) {
+ warn(logger, eventReporter, "Unable to refresh remote group peers
due to: {}", e.getMessage());
+ if (logger.isDebugEnabled() && e.getCause() != null) {
+ Throwable cause = e.getCause();
+ while ((cause = cause.getCause()) != null) {
Review comment:
The reason this was introduced was because the exceptions thrown here
are often very long (hundreds of lines of stacktrace) and occur very often (if
the TLS config is incorrect, this can be thrown many times per second, making
the log difficult/impossible to read). I've simplified this to just print the
stacktrace if debug is enabled, but I think there is still room for improvement
down the line.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]