Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/497#discussion_r97561966
  
    --- Diff: 
utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
 ---
    @@ -69,86 +65,113 @@ public ReachableSocketFinder(Predicate<? super 
HostAndPort> socketTester, Listen
         }
     
         /**
    -     * 
    +     * Returns the first element of sockets that is reachable.
          * @param sockets The host-and-ports to test
          * @param timeout Max time to try to connect to the ip:port
          * 
          * @return The reachable ip:port
    -     * @throws NoSuchElementException If no ports accessible within the 
given time
    -     * @throws NullPointerException  If the sockets or duration is null
    +     * @throws NoSuchElementException If no ports are accessible within 
the given time
    +     * @throws NullPointerException  If sockets or timeout is null
          * @throws IllegalStateException  If the sockets to test is empty
          */
    -    public HostAndPort findOpenSocketOnNode(final Collection<? extends 
HostAndPort> sockets, Duration timeout) {
    +    public HostAndPort findOpenSocketOnNode(final Iterable<? extends 
HostAndPort> sockets, Duration timeout) {
             checkNotNull(sockets, "sockets");
    -        checkState(sockets.size() > 0, "No hostAndPort sockets supplied");
    -        
    +        checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets 
supplied");
    +        checkNotNull(timeout, "timeout");
             LOG.debug("blocking on any reachable socket in {} for {}", 
sockets, timeout);
    -
    -        final AtomicReference<HostAndPort> result = new 
AtomicReference<HostAndPort>();
    -        boolean passed = Repeater.create("socket-reachable")
    -                .limitTimeTo(timeout)
    -                .backoffTo(Duration.FIVE_SECONDS)
    -                .until(new Callable<Boolean>() {
    -                        @Override
    -                        public Boolean call() {
    -                            Optional<HostAndPort> reachableSocket = 
tryReachable(sockets, Duration.FIVE_SECONDS);
    -                            if (reachableSocket.isPresent()) {
    -                                result.compareAndSet(null, 
reachableSocket.get());
    -                                return true;
    -                            }
    -                            return false;
    -                        }})
    -                .run();
    -
    -        if (passed) {
    -            LOG.debug("<< socket {} opened", result);
    -            assert result.get() != null;
    -            return result.get();
    +        Iterator<HostAndPort> iter = findOpenSocketsOnNode(sockets, 
timeout).iterator();
    +        if (iter.hasNext()) {
    +            return iter.next();
             } else {
                 LOG.warn("No sockets in {} reachable after {}", sockets, 
timeout);
                 throw new NoSuchElementException("could not connect to any 
socket in " + sockets);
             }
         }
     
         /**
    -     * Checks if any any of the given HostAndPorts are reachable. It 
checks them all concurrently, and
    -     * returns the first that is reachable (or Optional.absent).
    +     * Returns an iterable of the elements in sockets that are reachable. 
The order of elements
    +     * in the iterable corresponds to the order of the elements in the 
input, not the order in which their
    +     * reachability was determined. Iterators are unmodifiable and are 
evaluated lazily.
    +     *
    +     * @param sockets The host-and-ports to test
    +     * @param timeout Max time to try to connect to each ip:port
    +     * @return An iterable containing all sockets that are reachable 
according to {@link #socketTester}.
    +     * @throws NullPointerException  If sockets or timeout is null
    +     * @throws IllegalStateException  If the sockets to test is empty
          */
    -    private Optional<HostAndPort> tryReachable(Collection<? extends 
HostAndPort> sockets, Duration timeout) {
    -        final AtomicReference<HostAndPort> reachableSocket = new 
AtomicReference<HostAndPort>();
    -        final CountDownLatch latch = new CountDownLatch(1);
    -        List<ListenableFuture<?>> futures = Lists.newArrayList();
    +    public Iterable<HostAndPort> findOpenSocketsOnNode(final Iterable<? 
extends HostAndPort> sockets, Duration timeout) {
    +        checkNotNull(sockets, "sockets");
    +        checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets 
supplied");
    +        checkNotNull(timeout, "timeout");
    +        return Optional.presentInstances(tryReachable(sockets, timeout));
    +    }
    +
    +    /**
    +     * @return A lazily computed Iterable containing present values for 
the elements of sockets that are
    +     * reachable according to {@link #socketTester} and absent values for 
those not. Checks are concurrent
    +     * and the elements in the Iterable are ordered according to their 
position in sockets.
    +     */
    +    private Iterable<Optional<HostAndPort>> tryReachable(Iterable<? 
extends HostAndPort> sockets, final Duration timeout) {
    +        final List<ListenableFuture<Optional<HostAndPort>>> futures = 
Lists.newArrayList();
    +        final AtomicReference<Stopwatch> sinceFirstCompleted = new 
AtomicReference<>();
    +
             for (final HostAndPort socket : sockets) {
    -            futures.add(userExecutor.submit(new Runnable() {
    +            futures.add(userExecutor.submit(new 
Callable<Optional<HostAndPort>>() {
    +                @Override
    +                public Optional<HostAndPort> call() {
    +                    // Whether the socket was reachable (vs. the result of 
call, which is whether the repeater is done).
    +                    final AtomicBoolean theResultWeCareAbout = new 
AtomicBoolean();
    +                    Repeater.create("socket-reachable")
    +                            .limitTimeTo(timeout)
    +                            .backoffTo(Duration.FIVE_SECONDS)
    +                            .until(new Callable<Boolean>() {
    +                                @Override
    +                                public Boolean call() throws 
TimeoutException {
    +                                    boolean reachable = 
socketTester.apply(socket);
    +                                    if (reachable) {
    +                                        theResultWeCareAbout.set(true);
    +                                        return true;
    +                                    } else {
    +                                        // Run another check if nobody 
else has completed yet or another task has
    +                                        // completed but this one is still 
in its grace period.
    +                                        Stopwatch timerSinceFirst = 
sinceFirstCompleted.get();
    +                                        return timerSinceFirst != null && 
Duration.FIVE_SECONDS.subtract(Duration.of(timerSinceFirst)).isNegative();
    +                                    }
    +                                }
    +                            })
    +                            .run();
    +                    if (theResultWeCareAbout.get()) {
    +                        sinceFirstCompleted.compareAndSet(null, 
Stopwatch.createStarted());
    +                    }
    +                    return theResultWeCareAbout.get() ? 
Optional.of(socket) : Optional.<HostAndPort>absent();
    +                }
    +            }));
    +        }
    +
    +        return new Iterable<Optional<HostAndPort>>() {
    +            @Override
    +            public Iterator<Optional<HostAndPort>> iterator() {
    +                return new AbstractIterator<Optional<HostAndPort>>() {
    +                    int count = 0;
    +
                         @Override
    -                    public void run() {
    -                        try {
    -                            if (socketTester.apply(socket)) {
    -                                reachableSocket.compareAndSet(null, 
socket);
    -                                latch.countDown();
    +                    protected Optional<HostAndPort> computeNext() {
    +                        if (count < futures.size()) {
    +                            final Future<Optional<HostAndPort>> future = 
futures.get(count++);
    +                            try {
    +                                return 
future.get(timeout.toUnit(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    +                            } catch (Exception e) {
    +                                Exceptions.propagateIfInterrupt(e);
    --- End diff --
    
    `e` is an `Exception` rather than a `Throwable` so I don't think this is a 
problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to