# IGNITE-943 Extract SocketMultiConnector class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2f169f57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2f169f57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2f169f57 Branch: refs/heads/ignite-943 Commit: 2f169f5799fe9fca46b022f9c7f55dc37b816a74 Parents: f43cbbb Author: sevdokimov <[email protected]> Authored: Tue May 26 19:16:43 2015 +0300 Committer: sevdokimov <[email protected]> Committed: Tue May 26 19:16:43 2015 +0300 ---------------------------------------------------------------------- .../spi/discovery/tcp/SocketMultiConnector.java | 144 +++++++++++++++++++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 116 --------------- 3 files changed, 145 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java new file mode 100644 index 0000000..b988ceb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.spi.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Allow to connect to addresses parallel. + */ +class SocketMultiConnector implements AutoCloseable { + /** */ + private int connInProgress; + + /** */ + private final ExecutorService executor; + + /** */ + private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc; + + /** + * @param spi Discovery SPI. + * @param addrs Addresses. + * @param retryCnt Retry count. + */ + SocketMultiConnector(final TcpDiscoverySpiAdapter spi, Collection<InetSocketAddress> addrs, + final int retryCnt) { + connInProgress = addrs.size(); + + executor = Executors.newFixedThreadPool(Math.min(1, addrs.size())); + + completionSrvc = new ExecutorCompletionService<>(executor); + + for (final InetSocketAddress addr : addrs) { + completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() { + @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() { + Exception ex = null; + Socket sock = null; + + for (int i = 0; i < retryCnt; i++) { + if (Thread.currentThread().isInterrupted()) + return null; // Executor is shutdown. + + try { + sock = spi.openSocket(addr); + + break; + } + catch (Exception e) { + ex = e; + } + } + + return new GridTuple3<>(addr, sock, ex); + } + }); + } + } + + /** + * + */ + @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() { + if (connInProgress == 0) + return null; + + try { + Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take(); + + connInProgress--; + + return fut.get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteSpiException("Thread has been interrupted.", e); + } + catch (ExecutionException e) { + throw new IgniteSpiException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + List<Runnable> unstartedTasks = executor.shutdownNow(); + + connInProgress -= unstartedTasks.size(); + + if (connInProgress > 0) { + Thread thread = new Thread(new Runnable() { + @Override public void run() { + try { + executor.awaitTermination(5, TimeUnit.MINUTES); + + Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut; + + while ((fut = completionSrvc.poll()) != null) { + try { + GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get(); + + if (tuple3 != null) + IgniteUtils.closeQuiet(tuple3.get2()); + } + catch (ExecutionException ignore) { + + } + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new RuntimeException(e); + } + } + }); + + thread.setDaemon(true); + + thread.start(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 37a07d6..8ceac1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1344,7 +1344,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov boolean retry = false; Collection<Exception> errs = new ArrayList<>(); - try (SocketMultiConnector multiConnector = new SocketMultiConnector(addrs, 2)) { + try (SocketMultiConnector multiConnector = new SocketMultiConnector(this, addrs, 2)) { GridTuple3<InetSocketAddress, Socket, Exception> tuple; while ((tuple = multiConnector.next()) != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 4349ebc..ddbea0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -22,7 +22,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -39,7 +38,6 @@ import org.jetbrains.annotations.*; import java.io.*; import java.net.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** @@ -1184,118 +1182,4 @@ public abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements } } - /** - * Allow to connect to addresses parallel. - */ - protected class SocketMultiConnector implements AutoCloseable { - /** */ - private int connInProgress; - - /** */ - private final ExecutorService executor; - - /** */ - private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc; - - /** - * @param addrs Addresses. - * @param retryCnt Retry count. - */ - public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) { - connInProgress = addrs.size(); - - executor = Executors.newFixedThreadPool(Math.min(1, addrs.size())); - - completionSrvc = new ExecutorCompletionService<>(executor); - - for (final InetSocketAddress addr : addrs) { - completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() { - @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() { - Exception ex = null; - Socket sock = null; - - for (int i = 0; i < retryCnt; i++) { - if (Thread.currentThread().isInterrupted()) - return null; // Executor is shutdown. - - try { - sock = openSocket(addr); - - break; - } - catch (Exception e) { - ex = e; - } - } - - return new GridTuple3<>(addr, sock, ex); - } - }); - } - } - - /** - * - */ - @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() { - if (connInProgress == 0) - return null; - - try { - Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take(); - - connInProgress--; - - return fut.get(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteSpiException("Thread has been interrupted.", e); - } - catch (ExecutionException e) { - throw new IgniteSpiException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() { - List<Runnable> unstartedTasks = executor.shutdownNow(); - - connInProgress -= unstartedTasks.size(); - - if (connInProgress > 0) { - Thread thread = new Thread(new Runnable() { - @Override public void run() { - try { - executor.awaitTermination(5, TimeUnit.MINUTES); - - Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut; - - while ((fut = completionSrvc.poll()) != null) { - try { - GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get(); - - if (tuple3 != null) - IgniteUtils.closeQuiet(tuple3.get2()); - } - catch (ExecutionException ignore) { - - } - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new RuntimeException(e); - } - } - }); - - thread.setDaemon(true); - - thread.start(); - } - } - } }
