Repository: nifi
Updated Branches:
  refs/heads/master 1a9d505b4 -> a3586e04d


NIFI-2459: Site-to-Site bootstrap node failure

Refresh remote peer statuses even if the bootstrap node goes down.

Migrate existing code which handles the situation from
EndpointConnectionPool to PeerSelector, so that both RAW and HTTP
transport protocol has the same capability.

This closes #927.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a3586e04
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a3586e04
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a3586e04

Branch: refs/heads/master
Commit: a3586e04d9978e105cc5645e893dc6d77b79b86e
Parents: 1a9d505
Author: Koji Kawamura <[email protected]>
Authored: Wed Aug 24 17:59:25 2016 +0900
Committer: Mark Payne <[email protected]>
Committed: Wed Aug 24 17:28:10 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/remote/client/PeerSelector.java | 71 +++++++++++++--
 .../nifi/remote/client/PeerStatusProvider.java  | 35 +++++++-
 .../nifi/remote/client/http/HttpClient.java     | 12 ++-
 .../client/socket/EndpointConnectionPool.java   | 60 +++----------
 .../nifi/remote/client/TestPeerSelector.java    | 95 ++++++++++++++++++++
 5 files changed, 216 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a3586e04/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index e452b0f..0ec8951 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static org.apache.nifi.remote.util.EventReportUtil.error;
 import static org.apache.nifi.remote.util.EventReportUtil.warn;
@@ -61,6 +62,7 @@ public class PeerSelector {
 
     private final ReentrantLock peerRefreshLock = new ReentrantLock();
     private volatile List<PeerStatus> peerStatuses;
+    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
     private volatile long peerRefreshTime = 0L;
     private final AtomicLong peerIndex = new AtomicLong(0L);
     private volatile PeerStatusCache peerStatusCache;
@@ -71,6 +73,22 @@ public class PeerSelector {
     private final PeerStatusProvider peerStatusProvider;
     private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations 
= new ConcurrentHashMap<>();
 
+    static class SystemTime {
+        long currentTimeMillis() {
+            return System.currentTimeMillis();
+        }
+    }
+    private SystemTime systemTime = new SystemTime();
+
+    /**
+     * Replace the SystemTime instance.
+     * This method is purely used by unit testing, to emulate peer refresh 
period.
+     */
+    void setSystemTime(final SystemTime systemTime) {
+        logger.info("Replacing systemTime instance to {}.", systemTime);
+        this.systemTime = systemTime;
+    }
+
     public PeerSelector(final PeerStatusProvider peerStatusProvider, final 
File persistenceFile) {
         this.peerStatusProvider = peerStatusProvider;
         this.persistenceFile = persistenceFile;
@@ -213,13 +231,13 @@ public class PeerSelector {
             expiration = Long.valueOf(0L);
         }
 
-        final long newExpiration = Math.max(expiration, 
System.currentTimeMillis() + penalizationMillis);
+        final long newExpiration = Math.max(expiration, 
systemTime.currentTimeMillis() + penalizationMillis);
         peerTimeoutExpirations.put(peerDescription, 
Long.valueOf(newExpiration));
     }
 
     public boolean isPenalized(final PeerStatus peerStatus) {
         final Long expirationEnd = 
peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd != null && expirationEnd > 
System.currentTimeMillis());
+        return (expirationEnd != null && expirationEnd > 
systemTime.currentTimeMillis());
     }
 
     public void clear() {
@@ -227,7 +245,7 @@ public class PeerSelector {
     }
 
     private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
-        return (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+        return (peerList == null || peerList.isEmpty() || 
systemTime.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
     }
 
     /**
@@ -258,7 +276,7 @@ public class PeerSelector {
                     }
 
                     this.peerStatuses = peerList;
-                    peerRefreshTime = System.currentTimeMillis();
+                    peerRefreshTime = systemTime.currentTimeMillis();
                 }
             } finally {
                 peerRefreshLock.unlock();
@@ -305,7 +323,7 @@ public class PeerSelector {
             return null;
         }
 
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < 
System.currentTimeMillis()) {
+        if (cache.getTimestamp() + PEER_CACHE_MILLIS < 
systemTime.currentTimeMillis()) {
             final Set<PeerStatus> equalizedSet = new 
HashSet<>(cache.getStatuses().size());
             for (final PeerStatus status : cache.getStatuses()) {
                 final PeerStatus equalizedStatus = new 
PeerStatus(status.getPeerDescription(), 1, status.isQueryForPeers());
@@ -320,12 +338,12 @@ public class PeerSelector {
 
     public void refreshPeers() {
         final PeerStatusCache existingCache = peerStatusCache;
-        if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
+        if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > systemTime.currentTimeMillis())) {
             return;
         }
 
         try {
-            final Set<PeerStatus> statuses = 
peerStatusProvider.fetchRemotePeerStatuses();
+            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
             persistPeerStatuses(statuses);
             peerStatusCache = new PeerStatusCache(statuses);
             logger.info("{} Successfully refreshed Peer Status; remote 
instance consists of {} peers", this, statuses.size());
@@ -340,4 +358,43 @@ public class PeerSelector {
     public void setEventReporter(EventReporter eventReporter) {
         this.eventReporter = eventReporter;
     }
+
+    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
+        final Set<PeerDescription> peersToRequestClusterInfoFrom = new 
HashSet<>();
+
+        // Look at all of the peers that we fetched last time.
+        final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
+        if (lastFetched != null && !lastFetched.isEmpty()) {
+            lastFetched.stream().map(peer -> peer.getPeerDescription())
+                    .forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
+        }
+
+        // Always add the configured node info to the list of peers to 
communicate with
+        
peersToRequestClusterInfoFrom.add(peerStatusProvider.getBootstrapPeerDescription());
+
+        logger.debug("Fetching remote peer statuses from: {}", 
peersToRequestClusterInfoFrom);
+        Exception lastFailure = null;
+        for (final PeerDescription peerDescription : 
peersToRequestClusterInfoFrom) {
+            try {
+                final Set<PeerStatus> statuses = 
peerStatusProvider.fetchRemotePeerStatuses(peerDescription);
+                lastFetchedQueryablePeers = statuses.stream()
+                        .filter(p -> p.isQueryForPeers())
+                        .collect(Collectors.toSet());
+
+                return statuses;
+            } catch (final Exception e) {
+                logger.warn("Could not communicate with {}:{} to determine 
which nodes exist in the remote NiFi cluster, due to {}",
+                        peerDescription.getHostname(), 
peerDescription.getPort(), e.toString());
+                lastFailure = e;
+            }
+        }
+
+        final IOException ioe = new IOException("Unable to communicate with 
remote NiFi cluster in order to determine which nodes exist in the remote 
cluster");
+        if (lastFailure != null) {
+            ioe.addSuppressed(lastFailure);
+        }
+
+        throw ioe;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3586e04/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
index 68c30af..817bccf 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
@@ -16,12 +16,45 @@
  */
 package org.apache.nifi.remote.client;
 
+import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
 
 import java.io.IOException;
 import java.util.Set;
 
+/**
+ * This interface defines methods used from {@link PeerSelector}.
+ */
 public interface PeerStatusProvider {
 
-    Set<PeerStatus> fetchRemotePeerStatuses() throws IOException;
+    /**
+     * <p>
+     * Returns a PeerDescription instance, which represents a bootstrap remote 
NiFi node.
+     * The bootstrap node is always used to fetch remote peer statuses.
+     * </p>
+     * <p>
+     * Once the PeerSelector successfully got remote peer statuses, it 
periodically fetches remote peer statuses,
+     * so that it can detect remote NiFi cluster topology changes such as 
addition or removal of nodes.
+     * To refresh remote peer statuses, PeerSelector calls {@link 
#fetchRemotePeerStatuses} with one of query-able nodes
+     * lastly fetched from the remote NiFi cluster, until it gets a successful 
result,
+     * or throws IOException if none of them responds successfully.
+     * </p>
+     * <p>
+     * This mechanism lets PeerSelector works even if the bootstrap remote 
NiFi node goes down.
+     * </p>
+     * @return peer description of a bootstrap remote NiFi node
+     * @throws IOException thrown when it fails to retrieve the bootstrap 
remote node information
+     */
+    PeerDescription getBootstrapPeerDescription() throws IOException;
+
+    /**
+     * Fetch peer statuses from a remote NiFi cluster.
+     * Implementation of this method should fetch peer statuses from the node
+     * represented by the passed PeerDescription using its transport protocol.
+     * @param peerDescription a bootstrap node or one of query-able nodes 
lastly fetched successfully
+     * @return Remote peer statuses
+     * @throws IOException thrown when it fails to fetch peer statuses of the 
remote cluster from the specified peer
+     */
+    Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription 
peerDescription) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3586e04/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
index 3c92acd..0f0d4a5 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -84,12 +84,11 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
     }
 
     @Override
-    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
+    public PeerDescription getBootstrapPeerDescription() throws IOException {
         if (siteInfoProvider.getSiteToSiteHttpPort() == null) {
             throw new IOException("Remote instance of NiFi is not configured 
to allow HTTP site-to-site communications");
         }
 
-        final String scheme = siteInfoProvider.isSecure() ? "https" : "http";
         final URI clusterUrl;
         try {
             clusterUrl = new URI(config.getUrl());
@@ -97,8 +96,15 @@ public class HttpClient extends AbstractSiteToSiteClient 
implements PeerStatusPr
             throw new IllegalArgumentException("Specified clusterUrl was: " + 
config.getUrl(), e);
         }
 
+        return new PeerDescription(clusterUrl.getHost(), 
siteInfoProvider.getSiteToSiteHttpPort(), siteInfoProvider.isSecure());
+    }
+
+    @Override
+    public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription 
peerDescription) throws IOException {
+        // Each node should has the same URL structure and network 
reach-ability with the proxy configuration.
         try (final SiteToSiteRestApiClient apiClient = new 
SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) {
-            final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, 
clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
+            final String scheme = peerDescription.isSecure() ? "https" : 
"http";
+            final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, 
peerDescription.getHostname(), peerDescription.getPort());
 
             final int timeoutMillis = (int) 
config.getTimeout(TimeUnit.MILLISECONDS);
             apiClient.setConnectTimeoutMillis(timeoutMillis);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3586e04/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index f90aed9..a17deaa 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -41,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import javax.net.ssl.SSLContext;
 
@@ -360,7 +359,20 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
         }
     }
 
-    private Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription 
peerDescription) throws IOException {
+    @Override
+    public PeerDescription getBootstrapPeerDescription() throws IOException {
+        final String hostname = clusterUrl.getHost();
+        final Integer port = siteInfoProvider.getSiteToSitePort();
+        if (port == null) {
+            throw new IOException("Remote instance of NiFi is not configured 
to allow RAW Socket site-to-site communications");
+        }
+
+        final boolean secure = siteInfoProvider.isSecure();
+        return new PeerDescription(hostname, port, secure);
+    }
+
+    @Override
+    public Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription 
peerDescription) throws IOException {
         final String hostname = peerDescription.getHostname();
         final int port = peerDescription.getPort();
 
@@ -414,50 +426,6 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
         return peerStatuses;
     }
 
-    @Override
-    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
-        final Set<PeerDescription> peersToRequestClusterInfoFrom = new 
HashSet<>();
-
-        // Look at all of the peers that we fetched last time.
-        final Set<PeerStatus> lastFetched = lastFetchedQueryablePeers;
-        if (lastFetched != null && !lastFetched.isEmpty()) {
-            lastFetched.stream().map(peer -> peer.getPeerDescription())
-                .forEach(desc -> peersToRequestClusterInfoFrom.add(desc));
-        }
-
-        // Always add the configured node info to the list of peers to 
communicate with
-        final String hostname = clusterUrl.getHost();
-        final Integer port = siteInfoProvider.getSiteToSitePort();
-        if (port == null) {
-            throw new IOException("Remote instance of NiFi is not configured 
to allow RAW Socket site-to-site communications");
-        }
-
-        final boolean secure = siteInfoProvider.isSecure();
-        peersToRequestClusterInfoFrom.add(new PeerDescription(hostname, port, 
secure));
-
-        Exception lastFailure = null;
-        for (final PeerDescription peerDescription : 
peersToRequestClusterInfoFrom) {
-            try {
-                final Set<PeerStatus> statuses = 
fetchRemotePeerStatuses(peerDescription);
-                lastFetchedQueryablePeers = statuses.stream()
-                    .filter(p -> p.isQueryForPeers())
-                    .collect(Collectors.toSet());
-
-                return statuses;
-            } catch (final Exception e) {
-                logger.warn("Could not communicate with {}:{} to determine 
which nodes exist in the remote NiFi cluster", peerDescription.getHostname(), 
peerDescription.getPort());
-                lastFailure = e;
-            }
-        }
-
-        final IOException ioe = new IOException("Unable to communicate with 
remote NiFi cluster in order to determine which nodes exist in the remote 
cluster");
-        if (lastFailure != null) {
-            ioe.addSuppressed(lastFailure);
-        }
-
-        throw ioe;
-    }
-
     private CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
         final PeerDescription description = peerStatus.getPeerDescription();
         return establishSiteToSiteConnection(description.getHostname(), 
description.getPort());

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3586e04/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
index 4c0f0d6..c434c7b 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java
@@ -29,11 +29,19 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.reducing;
 import static java.util.stream.Collectors.toMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 
 public class TestPeerSelector {
 
@@ -122,4 +130,91 @@ public class TestPeerSelector {
         logger.info("selectedCounts={}", selectedCounts);
         assertTrue("HasLots should get little", selectedCounts.get("HasLots") 
< selectedCounts.get("HasLittle"));
     }
+
+    private static class UnitTestSystemTime extends PeerSelector.SystemTime {
+        private long offset = 0;
+
+        @Override
+        long currentTimeMillis() {
+            return super.currentTimeMillis() + offset;
+        }
+    }
+
+    /**
+     * This test simulates a failure scenario of a remote NiFi cluster. It 
confirms that:
+     * <ol>
+     *     <li>PeerSelector uses the bootstrap node to fetch remote peer 
statuses at the initial attempt</li>
+     *     <li>PeerSelector uses one of query-able nodes lastly fetched 
successfully</li>
+     *     <li>PeerSelector can refresh remote peer statuses even if the 
bootstrap node is down</li>
+     *     <li>PeerSelector returns null as next peer when there's no peer 
available</li>
+     *     <li>PeerSelector always tries to fetch peer statuses at least from 
the bootstrap node, so that it can
+     *     recover when the node gets back online</li>
+     * </ol>
+     */
+    @Test
+    public void testFetchRemotePeerStatuses() throws IOException {
+
+        final Set<PeerStatus> peerStatuses = new HashSet<>();
+        final PeerDescription bootstrapNode = new PeerDescription("Node1", 
1111, true);
+        final PeerDescription node2 = new PeerDescription("Node2", 2222, true);
+        final PeerStatus bootstrapNodeStatus = new PeerStatus(bootstrapNode, 
10, true);
+        final PeerStatus node2Status = new PeerStatus(node2, 10, true);
+        peerStatuses.add(bootstrapNodeStatus);
+        peerStatuses.add(node2Status);
+
+        final PeerStatusProvider peerStatusProvider = 
Mockito.mock(PeerStatusProvider.class);
+        final PeerSelector peerSelector = new PeerSelector(peerStatusProvider, 
null);
+        final UnitTestSystemTime systemTime = new UnitTestSystemTime();
+        peerSelector.setSystemTime(systemTime);
+
+        
doReturn(bootstrapNode).when(peerStatusProvider).getBootstrapPeerDescription();
+        doAnswer(invocation -> {
+            final PeerDescription peerFetchStatusesFrom = 
invocation.getArgumentAt(0, PeerDescription.class);
+            if (peerStatuses.stream().filter(ps -> 
ps.getPeerDescription().equals(peerFetchStatusesFrom)).collect(Collectors.toSet()).size()
 > 0) {
+                // If the remote peer is running, then return available peer 
statuses.
+                return peerStatuses;
+            }
+            throw new IOException("Connection refused. " + 
peerFetchStatusesFrom + " is not running.");
+        
}).when(peerStatusProvider).fetchRemotePeerStatuses(any(PeerDescription.class));
+
+        // 1st attempt. It uses the bootstrap node.
+        peerSelector.refreshPeers();
+        PeerStatus peerStatus = 
peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNotNull(peerStatus);
+
+        // Proceed time so that peer selector refresh statuses.
+        peerStatuses.remove(bootstrapNodeStatus);
+        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
+
+        // 2nd attempt.
+        peerSelector.refreshPeers();
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNotNull(peerStatus);
+        assertEquals("Node2 should be returned since node 2 is the only 
available node.", node2, peerStatus.getPeerDescription());
+
+        // Proceed time so that peer selector refresh statuses.
+        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
+
+        // 3rd attempt.
+        peerSelector.refreshPeers();
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNotNull(peerStatus);
+        assertEquals("Node2 should be returned since node 2 is the only 
available node.", node2, peerStatus.getPeerDescription());
+
+        // Remove node2 to simulate that it goes down. There's no available 
node at this point.
+        peerStatuses.remove(node2Status);
+        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
+
+        peerSelector.refreshPeers();
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertNull("PeerSelector should return null as next peer status, since 
there's no available peer", peerStatus);
+
+        // Add node1 back. PeerSelector should be able to fetch peer statuses 
because it always tries to fetch at least from the bootstrap node.
+        peerStatuses.add(bootstrapNodeStatus);
+        systemTime.offset += TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES) + 1;
+
+        peerSelector.refreshPeers();
+        peerStatus = peerSelector.getNextPeerStatus(TransferDirection.RECEIVE);
+        assertEquals("Node1 should be returned since node 1 is the only 
available node.", bootstrapNode, peerStatus.getPeerDescription());
+    }
 }

Reply via email to