Merge remote-tracking branch 'remotes/origin/master' into ignite-zk-ce # Conflicts: # modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81014550 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81014550 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81014550 Branch: refs/heads/ignite-zk-ce Commit: 8101455034a723eadf94dabe4f5d8f1fc0ae5606 Parents: ed492a4 91be7af Author: sboikov <[email protected]> Authored: Mon Dec 18 10:38:07 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 18 10:38:07 2017 +0300 ---------------------------------------------------------------------- .../spi/communication/tcp/TcpCommunicationSpi.java | 3 +-- .../TcpCommunicationConnectionCheckFuture.java | 17 ++++++++++++++++- .../ignite/ml/knn/models/KNNModelFormat.java | 6 +++++- 3 files changed, 22 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81014550/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java index 99e1eca,0000000..6cb5622 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@@ -1,461 -1,0 +1,476 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.BitSet; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; ++import org.apache.ignite.events.DiscoveryEvent; ++import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteInternalFuture; ++import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; + ++import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; ++import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; ++ +/** + * + */ - public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject { ++public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener { + /** Session future. */ + public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey(); + + /** */ + public static final ConnectionKey CONN_CHECK_DUMMY_KEY = new ConnectionKey(null, -1, -1); + + /** */ + private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater = + AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done"); + + /** */ + private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater = + AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt"); + + /** */ + private final AtomicInteger resCntr = new AtomicInteger(); + + /** */ + private final List<ClusterNode> nodes; + + /** */ + private volatile ConnectFuture[] futs; + + /** */ + private final GridNioServer nioSrvr; + + /** */ + private final TcpCommunicationSpi spi; + + /** */ + private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid(); + + /** */ + private final BitSet resBitSet; + + /** */ + private long endTime; + + /** */ + private final IgniteLogger log; + + /** + * @param spi SPI instance. + * @param log Logger. + * @param nioSrvr NIO server. + * @param nodes Nodes to check. + */ + public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi, + IgniteLogger log, + GridNioServer nioSrvr, + List<ClusterNode> nodes) + { + this.spi = spi; + this.log = log; + this.nioSrvr = nioSrvr; + this.nodes = nodes; + + resBitSet = new BitSet(nodes.size()); + } + ++ /** {@inheritDoc} */ ++ @Override public void onEvent(Event evt) { ++ assert evt instanceof DiscoveryEvent : evt; ++ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; ++ ++ } ++ + /** + * @param timeout Connect timeout. + */ + public void init(long timeout) { + ConnectFuture[] futs = new ConnectFuture[nodes.size()]; + + UUID locId = spi.getSpiContext().localNode().id(); + + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (!node.id().equals(locId)) { + if (spi.getSpiContext().node(node.id()) == null) { + receivedConnectionStatus(i, false); + + continue; + } + + Collection<InetSocketAddress> addrs; + + try { + addrs = spi.nodeAddresses(node, false); + } + catch (Exception e) { + U.error(log, "Failed to get node addresses: " + node, e); + + receivedConnectionStatus(i, false); + + continue; + } + + if (addrs.size() == 1) { + SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i); + + fut.init(addrs.iterator().next()); + + futs[i] = fut; + } + else { + MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i); + + fut.init(addrs); + } + } + else + receivedConnectionStatus(i, true); + } + + this.futs = futs; + ++ spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED); ++ + if (!isDone()) { + endTime = System.currentTimeMillis() - timeout; + + spi.getSpiContext().addTimeoutObject(this); + } + } + + /** + * @param idx Node index. + * @param res Success flag. + */ + private void receivedConnectionStatus(int idx, boolean res) { + assert resCntr.get() < nodes.size(); + + synchronized (resBitSet) { + resBitSet.set(idx, res); + } + + if (resCntr.incrementAndGet() == nodes.size()) + onDone(resBitSet); + } + + /** + * @param nodeIdx Node index. + * @return Node ID. + */ + private UUID nodeId(int nodeIdx) { + return nodes.get(nodeIdx).id(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return timeoutObjId; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (!isDone()) + return; + + ConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onTimeout(); + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + spi.getSpiContext().removeTimeoutObject(this); + + return true; + } + + return false; + } + + /** + * + */ + private interface ConnectFuture { + /** + * + */ + void onTimeout(); + } + + /** + * + */ + private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture { + /** */ + final int nodeIdx; + + /** */ + volatile int done; + + /** */ + Map<Integer, Object> sesMeta; + + /** */ + private SocketChannel ch; + + /** + * @param nodeIdx Node index. + */ + SingleAddressConnectFuture(int nodeIdx) { + this.nodeIdx = nodeIdx; + } + + /** + * @param addr Node address. + */ + public void init(InetSocketAddress addr) { + boolean connect; + + try { + ch = SocketChannel.open(); + + ch.configureBlocking(false); + + ch.socket().setTcpNoDelay(true); + ch.socket().setKeepAlive(false); + + connect = ch.connect(addr); + } + catch (Exception e) { + finish(false); + + return; + } + + if (!connect) { + sesMeta = new GridLeanMap<>(3); + + // Set dummy key to identify connection-check outgoing connection. + sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, CONN_CHECK_DUMMY_KEY); + sesMeta.put(SES_FUT_META, this); + + nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { + if (fut.error() != null) + finish(false); + } + }); + } + } + + /** + * + */ + void cancel() { + if (finish(false)) + nioSrvr.cancelConnect(ch, sesMeta); + } + + /** {@inheritDoc} */ + public void onTimeout() { + cancel(); + } + + /** {@inheritDoc} */ + public void onConnected(UUID rmtNodeId) { + finish(nodeId(nodeIdx).equals(rmtNodeId)); + } + + /** + * @param res Result. + * @return {@code True} if result was set by this call. + */ + boolean finish(boolean res) { + if (connFutDoneUpdater.compareAndSet(this, 0, 1)) { + onStatusReceived(res); + + return true; + } + + return false; + } + + /** + * @param res Result. + */ + void onStatusReceived(boolean res) { + receivedConnectionStatus(nodeIdx, res); + } + } + + /** + * + */ + private class MultipleAddressesConnectFuture implements ConnectFuture { + /** */ + volatile int resCnt; + + /** */ + volatile SingleAddressConnectFuture[] futs; + + /** */ + final int nodeIdx; + + /** + * @param nodeIdx Node index. + */ + MultipleAddressesConnectFuture(int nodeIdx) { + this.nodeIdx = nodeIdx; + + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + SingleAddressConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onTimeout(); + } + } + + /** + * @param addrs Node addresses. + */ + void init(Collection<InetSocketAddress> addrs) { + SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()]; + + int idx = 0; + + for (InetSocketAddress addr : addrs) { + SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) { + @Override void onStatusReceived(boolean res) { + receivedAddressStatus(res); + } + }; + + fut.init(addr); + + futs[idx++] = fut; + + if (done()) + return; + } + + this.futs = futs; + + // Close race. + if (done()) + cancelFutures(); + } + + /** + * @return {@code True} + */ + private boolean done() { + int resCnt0 = resCnt; + + return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length; + } + + /** + * + */ + private void cancelFutures() { + SingleAddressConnectFuture[] futs = this.futs; + + if (futs != null) { + for (int i = 0; i < futs.length; i++) { + SingleAddressConnectFuture fut = futs[i]; + + fut.cancel(); + } + } + } + + /** + * @param res Result. + */ + void receivedAddressStatus(boolean res) { + if (res) { + for (;;) { + int resCnt0 = resCnt; + + if (resCnt0 == Integer.MAX_VALUE) + return; + + if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) { + receivedConnectionStatus(nodeIdx, true); + + cancelFutures(); // Cancel others connects if they are still in progress. + + return; + } + } + } + else { + for (;;) { + int resCnt0 = resCnt; + + if (resCnt0 == Integer.MAX_VALUE) + return; + + int resCnt1 = resCnt0 + 1; + + if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) { + if (resCnt1 == futs.length) + receivedConnectionStatus(nodeIdx, false); + + return; + } + } + } + } + } +}
