http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java deleted file mode 100644 index 802da02..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ /dev/null @@ -1,1160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.discovery.*; -import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.messages.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState.*; - -/** - * Base class for TCP discovery SPIs. - */ -abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements DiscoverySpi { - /** Default port to listen (value is <tt>47500</tt>). */ - public static final int DFLT_PORT = 47500; - - /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */ - public static final long DFLT_SOCK_TIMEOUT = 200; - - /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */ - public static final long DFLT_ACK_TIMEOUT = 50; - - /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */ - public static final long DFLT_NETWORK_TIMEOUT = 5000; - - /** Default value for thread priority (value is <tt>10</tt>). */ - public static final int DFLT_THREAD_PRI = 10; - - /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */ - public static final long DFLT_HEARTBEAT_FREQ = 100; - - /** Default size of topology snapshots history. */ - public static final int DFLT_TOP_HISTORY_SIZE = 1000; - - /** Response OK. */ - protected static final int RES_OK = 1; - - /** Response CONTINUE JOIN. */ - protected static final int RES_CONTINUE_JOIN = 100; - - /** Response WAIT. */ - protected static final int RES_WAIT = 200; - - /** Local address. */ - protected String locAddr; - - /** IP finder. */ - protected TcpDiscoveryIpFinder ipFinder; - - /** Socket operations timeout. */ - protected long sockTimeout = DFLT_SOCK_TIMEOUT; - - /** Message acknowledgement timeout. */ - protected long ackTimeout = DFLT_ACK_TIMEOUT; - - /** Network timeout. */ - protected long netTimeout = DFLT_NETWORK_TIMEOUT; - - /** Thread priority for all threads started by SPI. */ - protected int threadPri = DFLT_THREAD_PRI; - - /** Heartbeat messages issuing frequency. */ - protected long hbFreq = DFLT_HEARTBEAT_FREQ; - - /** Size of topology snapshots history. */ - protected int topHistSize = DFLT_TOP_HISTORY_SIZE; - - /** Grid discovery listener. */ - protected volatile DiscoverySpiListener lsnr; - - /** Data exchange. */ - protected DiscoverySpiDataExchange exchange; - - /** Metrics provider. */ - protected DiscoveryMetricsProvider metricsProvider; - - /** Local node attributes. */ - protected Map<String, Object> locNodeAttrs; - - /** Local node version. */ - protected IgniteProductVersion locNodeVer; - - /** Local node. */ - protected TcpDiscoveryNode locNode; - - /** Local host. */ - protected InetAddress locHost; - - /** Internal and external addresses of local node. */ - protected Collection<InetSocketAddress> locNodeAddrs; - - /** Socket timeout worker. */ - protected SocketTimeoutWorker sockTimeoutWorker; - - /** Discovery state. */ - protected TcpDiscoverySpiState spiState = DISCONNECTED; - - /** Start time of the very first grid node. */ - protected volatile long gridStartTime; - - /** Marshaller. */ - protected final Marshaller marsh = new JdkMarshaller(); - - /** Statistics. */ - protected final TcpDiscoveryStatistics stats = new TcpDiscoveryStatistics(); - - /** Logger. */ - @LoggerResource - protected IgniteLogger log; - - /** - * Inject resources - * - * @param ignite Ignite. - */ - @IgniteInstanceResource - @Override protected void injectResources(Ignite ignite) { - super.injectResources(ignite); - - // Inject resource. - if (ignite != null) - setLocalAddress(ignite.configuration().getLocalHost()); - } - - /** - * Sets local host IP address that discovery SPI uses. - * <p> - * If not provided, by default a first found non-loopback address - * will be used. If there is no non-loopback address available, - * then {@link InetAddress#getLocalHost()} will be used. - * - * @param locAddr IP address. - */ - @IgniteSpiConfiguration(optional = true) - public void setLocalAddress(String locAddr) { - // Injection should not override value already set by Spring or user. - if (this.locAddr == null) - this.locAddr = locAddr; - } - - /** - * Gets local address that was set to SPI with {@link #setLocalAddress(String)} method. - * - * @return local address. - */ - public String getLocalAddress() { - return locAddr; - } - - /** - * Gets IP finder for IP addresses sharing and storing. - * - * @return IP finder for IP addresses sharing and storing. - */ - public TcpDiscoveryIpFinder getIpFinder() { - return ipFinder; - } - - /** - * Sets IP finder for IP addresses sharing and storing. - * <p> - * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default. - * - * @param ipFinder IP finder. - */ - @IgniteSpiConfiguration(optional = true) - public void setIpFinder(TcpDiscoveryIpFinder ipFinder) { - this.ipFinder = ipFinder; - } - - /** - * Sets socket operations timeout. This timeout is used to limit connection time and - * write-to-socket time. - * <p> - * Note that when running Ignite on Amazon EC2, socket timeout must be set to a value - * significantly greater than the default (e.g. to {@code 30000}). - * <p> - * If not specified, default is {@link #DFLT_SOCK_TIMEOUT}. - * - * @param sockTimeout Socket connection timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setSocketTimeout(long sockTimeout) { - this.sockTimeout = sockTimeout; - } - - /** - * Sets timeout for receiving acknowledgement for sent message. - * <p> - * If acknowledgement is not received within this timeout, sending is considered as failed - * and SPI tries to repeat message sending. - * <p> - * If not specified, default is {@link #DFLT_ACK_TIMEOUT}. - * - * @param ackTimeout Acknowledgement timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - } - - /** - * Sets maximum network timeout to use for network operations. - * <p> - * If not specified, default is {@link #DFLT_NETWORK_TIMEOUT}. - * - * @param netTimeout Network timeout. - */ - @IgniteSpiConfiguration(optional = true) - public void setNetworkTimeout(long netTimeout) { - this.netTimeout = netTimeout; - } - - /** - * Sets thread priority. All threads within SPI will be started with it. - * <p> - * If not provided, default value is {@link #DFLT_THREAD_PRI} - * - * @param threadPri Thread priority. - */ - @IgniteSpiConfiguration(optional = true) - public void setThreadPriority(int threadPri) { - this.threadPri = threadPri; - } - - /** - * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages - * in configurable time interval to other nodes to notify them about its state. - * <p> - * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. - * - * @param hbFreq Heartbeat frequency in milliseconds. - */ - @IgniteSpiConfiguration(optional = true) - public void setHeartbeatFrequency(long hbFreq) { - this.hbFreq = hbFreq; - } - - /** - * @return Size of topology snapshots history. - */ - public long getTopHistorySize() { - return topHistSize; - } - - /** - * Sets size of topology snapshots history. Specified size should be greater than or equal to default size - * {@link #DFLT_TOP_HISTORY_SIZE}. - * - * @param topHistSize Size of topology snapshots history. - */ - @IgniteSpiConfiguration(optional = true) - public void setTopHistorySize(int topHistSize) { - if (topHistSize < DFLT_TOP_HISTORY_SIZE) { - U.warn(log, "Topology history size should be greater than or equal to default size. " + - "Specified size will not be set [curSize=" + this.topHistSize + ", specifiedSize=" + topHistSize + - ", defaultSize=" + DFLT_TOP_HISTORY_SIZE + ']'); - - return; - } - - this.topHistSize = topHistSize; - } - - /** {@inheritDoc} */ - @Override public void setNodeAttributes(Map<String, Object> attrs, IgniteProductVersion ver) { - assert locNodeAttrs == null; - assert locNodeVer == null; - - if (log.isDebugEnabled()) { - log.debug("Node attributes to set: " + attrs); - log.debug("Node version to set: " + ver); - } - - locNodeAttrs = attrs; - locNodeVer = ver; - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onContextInitialized0(spiCtx); - - ipFinder.onSpiContextInitialized(spiCtx); - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - super.onContextDestroyed0(); - - if (ipFinder != null) - ipFinder.onSpiContextDestroyed(); - } - - /** {@inheritDoc} */ - @Override public ClusterNode getLocalNode() { - return locNode; - } - - /** {@inheritDoc} */ - @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { - this.lsnr = lsnr; - } - - /** {@inheritDoc} */ - @Override public void setDataExchange(DiscoverySpiDataExchange exchange) { - this.exchange = exchange; - } - - /** {@inheritDoc} */ - @Override public void setMetricsProvider(DiscoveryMetricsProvider metricsProvider) { - this.metricsProvider = metricsProvider; - } - - /** {@inheritDoc} */ - @Override public long getGridStartTime() { - assert gridStartTime != 0; - - return gridStartTime; - } - - /** - * @param sockAddr Remote address. - * @return Opened socket. - * @throws IOException If failed. - */ - protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { - assert sockAddr != null; - - InetSocketAddress resolved = sockAddr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr; - - InetAddress addr = resolved.getAddress(); - - assert addr != null; - - Socket sock = new Socket(); - - sock.bind(new InetSocketAddress(locHost, 0)); - - sock.setTcpNoDelay(true); - - sock.connect(resolved, (int)sockTimeout); - - writeToSocket(sock, U.IGNITE_HEADER); - - return sock; - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param data Raw data to write. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, byte[] data) throws IOException { - assert sock != null; - assert data != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - out.write(data); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed or write timed out. - * @throws IgniteCheckedException If marshalling failed. - */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K. - } - - /** - * Writes message to the socket. - * - * @param sock Socket. - * @param msg Message. - * @param bout Byte array output stream. - * @throws IOException If IO failed or write timed out. - * @throws IgniteCheckedException If marshalling failed. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout) - throws IOException, IgniteCheckedException { - assert sock != null; - assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - IOException err = null; - - try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Writes response to the socket. - * - * @param sock Socket. - * @param res Integer response. - * @throws IOException If IO failed or write timed out. - */ - @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, int res) throws IOException { - assert sock != null; - - SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - - sockTimeoutWorker.addTimeoutObject(obj); - - OutputStream out = sock.getOutputStream(); - - IOException err = null; - - try { - out.write(res); - - out.flush(); - } - catch (IOException e) { - err = e; - } - finally { - boolean cancelled = obj.cancel(); - - if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); - - // Throw original exception. - if (err != null) - throw err; - - if (!cancelled) - throw new SocketTimeoutException("Write timed out (socket was concurrently closed)."); - } - } - - /** - * Reads message from the socket limiting read time. - * - * @param sock Socket. - * @param in Input stream (in case socket stream was wrapped). - * @param timeout Socket timeout for this operation. - * @return Message. - * @throws IOException If IO failed or read timed out. - * @throws IgniteCheckedException If unmarshalling failed. - */ - protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader()); - } - catch (IOException | IgniteCheckedException e) { - if (X.hasCause(e, SocketTimeoutException.class)) - LT.warn(log, null, "Timed out waiting for message to be read (most probably, the reason is " + - "in long GC pauses on remote node. Current timeout: " + timeout + '.'); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Reads message delivery receipt from the socket. - * - * @param sock Socket. - * @param timeout Socket timeout for this operation. - * @return Receipt. - * @throws IOException If IO failed or read timed out. - */ - protected int readReceipt(Socket sock, long timeout) throws IOException { - assert sock != null; - - int oldTimeout = sock.getSoTimeout(); - - try { - sock.setSoTimeout((int)timeout); - - int res = sock.getInputStream().read(); - - if (res == -1) - throw new EOFException(); - - return res; - } - catch (SocketTimeoutException e) { - LT.warn(log, null, "Timed out waiting for message delivery receipt (most probably, the reason is " + - "in long GC pauses on remote node; consider tuning GC and increasing 'ackTimeout' " + - "configuration property). Will retry to send message with increased timeout. " + - "Current timeout: " + timeout + '.'); - - stats.onAckTimeout(); - - throw e; - } - finally { - // Quietly restore timeout. - try { - sock.setSoTimeout(oldTimeout); - } - catch (SocketException ignored) { - // No-op. - } - } - } - - /** - * Resolves addresses registered in the IP finder, removes duplicates and local host - * address and returns the collection of. - * - * @return Resolved addresses without duplicates and local address (potentially - * empty but never null). - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { - List<InetSocketAddress> res = new ArrayList<>(); - - Collection<InetSocketAddress> addrs; - - // Get consistent addresses collection. - while (true) { - try { - addrs = registeredAddresses(); - - break; - } - catch (IgniteSpiException e) { - LT.error(log, e, "Failed to get registered addresses from IP finder on start " + - "(retrying every 2000 ms)."); - } - - try { - U.sleep(2000); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException("Thread has been interrupted.", e); - } - } - - for (InetSocketAddress addr : addrs) { - assert addr != null; - - try { - InetSocketAddress resolved = addr.isUnresolved() ? - new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; - - if (locNodeAddrs == null || !locNodeAddrs.contains(resolved)) - res.add(resolved); - } - catch (UnknownHostException ignored) { - LT.warn(log, null, "Failed to resolve address from IP finder (host is unknown): " + addr); - - // Add address in any case. - res.add(addr); - } - } - - if (!res.isEmpty()) - Collections.shuffle(res); - - return res; - } - - /** - * Gets addresses registered in the IP finder, initializes addresses having no - * port (or 0 port) with {@link #DFLT_PORT}. - * - * @return Registered addresses. - * @throws org.apache.ignite.spi.IgniteSpiException If an error occurs. - */ - protected Collection<InetSocketAddress> registeredAddresses() throws IgniteSpiException { - Collection<InetSocketAddress> res = new ArrayList<>(); - - for (InetSocketAddress addr : ipFinder.getRegisteredAddresses()) { - if (addr.getPort() == 0) { - // TcpDiscoveryNode.discoveryPort() returns an correct port for a server node and 0 for client node. - int port = locNode.discoveryPort() != 0 ? locNode.discoveryPort() : DFLT_PORT; - - addr = addr.isUnresolved() ? new InetSocketAddress(addr.getHostName(), port) : - new InetSocketAddress(addr.getAddress(), port); - } - - res.add(addr); - } - - return res; - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) { - assert msg != null; - - return new IgniteSpiException("Local node has the same ID as existing node in topology " + - "(fix configuration and restart local node) [localNode=" + locNode + - ", existingNode=" + msg.node() + ']'); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException authenticationFailedError(TcpDiscoveryAuthFailedMessage msg) { - assert msg != null; - - return new IgniteSpiException(new IgniteAuthenticationException("Authentication failed [nodeId=" + - msg.creatorNodeId() + ", addr=" + msg.address().getHostAddress() + ']')); - } - - /** - * @param msg Message. - * @return Error. - */ - protected IgniteSpiException checkFailedError(TcpDiscoveryCheckFailedMessage msg) { - assert msg != null; - - return versionCheckFailed(msg) ? new IgniteSpiVersionCheckException(msg.error()) : - new IgniteSpiException(msg.error()); - } - - /** - * @param msg Message. - * @return Whether delivery of the message is ensured. - */ - protected boolean ensured(TcpDiscoveryAbstractMessage msg) { - return U.getAnnotation(msg.getClass(), TcpDiscoveryEnsureDelivery.class) != null; - } - - /** - * @param msg Failed message. - * @return {@code True} if specified failed message relates to version incompatibility, {@code false} otherwise. - * @deprecated Parsing of error message was used for preserving backward compatibility. We should remove it - * and create separate message for failed version check with next major release. - */ - @Deprecated - private static boolean versionCheckFailed(TcpDiscoveryCheckFailedMessage msg) { - return msg.error().contains("versions are not compatible"); - } - - /** - * @param joiningNodeID Joining node ID. - * @param nodeId Remote node ID for which data is provided. - * @param data Collection of marshalled discovery data objects from different components. - * @param clsLdr Class loader for discovery data unmarshalling. - */ - protected void onExchange(UUID joiningNodeID, - UUID nodeId, - Map<Integer, byte[]> data, - ClassLoader clsLdr) - { - Map<Integer, Serializable> data0 = U.newHashMap(data.size()); - - for (Map.Entry<Integer, byte[]> entry : data.entrySet()) { - try { - Serializable compData = marsh.unmarshal(entry.getValue(), clsLdr); - - data0.put(entry.getKey(), compData); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e); - } - } - - exchange.onExchange(joiningNodeID, nodeId, data0); - } - - /** - * Handles sockets timeouts. - */ - protected class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() { - @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); - - long id1 = o1.id(); - long id2 = o2.id(); - - return time1 < time2 ? -1 : time1 > time2 ? 1 : - id1 < id2 ? -1 : id1 > id2 ? 1 : 0; - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-disco-sock-timeout-worker", log); - - setPriority(threadPri); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } - - /** - * @param timeoutObj Timeout object to remove. - */ - public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); - - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { - SocketTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); - - if (timeoutObj.onTimeout()) { - LT.warn(log, null, "Socket write has timed out (consider increasing " + - "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); - - stats.onSocketTimeout(); - } - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - SocketTimeoutObject first = timeoutObjs.firstx(); - - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); - - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } - } - } - } - - /** - * Socket timeout object. - */ - private static class SocketTimeoutObject { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - - /** */ - private final long id = idGen.incrementAndGet(); - - /** */ - private final Socket sock; - - /** */ - private final long endTime; - - /** */ - private final AtomicBoolean done = new AtomicBoolean(); - - /** - * @param sock Socket. - * @param endTime End time. - */ - SocketTimeoutObject(Socket sock, long endTime) { - assert sock != null; - assert endTime > 0; - - this.sock = sock; - this.endTime = endTime; - } - - /** - * @return {@code True} if object has not yet been processed. - */ - boolean cancel() { - return done.compareAndSet(false, true); - } - - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { - if (done.compareAndSet(false, true)) { - // Close socket - timeout occurred. - U.closeQuiet(sock); - - return true; - } - - return false; - } - - /** - * @return End time. - */ - long endTime() { - return endTime; - } - - /** - * @return ID. - */ - long id() { - return id; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SocketTimeoutObject.class, this); - } - } - - /** - * Base class for message workers. - */ - protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - - /** Message queue. */ - private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>(); - - /** Backed interrupted flag. */ - private volatile boolean interrupted; - - /** - * @param name Thread name. - */ - protected MessageWorkerAdapter(String name) { - super(gridName, name, log); - - setPriority(threadPri); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']'); - - while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); - - if (msg == null) - continue; - - processMessage(msg); - } - } - - /** {@inheritDoc} */ - @Override public void interrupt() { - interrupted = true; - - super.interrupt(); - } - - /** {@inheritDoc} */ - @Override public boolean isInterrupted() { - return interrupted || super.isInterrupted(); - } - - /** - * @return Current queue size. - */ - int queueSize() { - return queue.size(); - } - - /** - * Adds message to queue. - * - * @param msg Message to add. - */ - void addMessage(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - if (msg instanceof TcpDiscoveryHeartbeatMessage) - queue.addFirst(msg); - else - queue.add(msg); - - if (log.isDebugEnabled()) - log.debug("Message has been added to queue: " + msg); - } - - /** - * @param msg Message. - */ - protected abstract void processMessage(TcpDiscoveryAbstractMessage msg); - - /** - * @param sock Socket. - * @param msg Message. - * @throws IOException If IO failed. - * @throws IgniteCheckedException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) - throws IOException, IgniteCheckedException { - bout.reset(); - - TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout); - } - } - - /** - * - */ - protected class SocketMultiConnector implements AutoCloseable { - /** */ - private int connInProgress; - - /** */ - private final ExecutorService executor; - - /** */ - private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc; - - /** - * @param addrs Addresses. - * @param retryCnt Retry count. - */ - public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) { - connInProgress = addrs.size(); - - executor = Executors.newFixedThreadPool(Math.min(1, addrs.size())); - - completionSrvc = new ExecutorCompletionService<>(executor); - - for (final InetSocketAddress addr : addrs) { - completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() { - @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() { - Exception ex = null; - Socket sock = null; - - for (int i = 0; i < retryCnt; i++) { - if (Thread.currentThread().isInterrupted()) - return null; // Executor is shutdown. - - try { - sock = openSocket(addr); - - break; - } - catch (Exception e) { - ex = e; - } - } - - return new GridTuple3<>(addr, sock, ex); - } - }); - } - } - - /** - * - */ - @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() { - if (connInProgress == 0) - return null; - - try { - Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take(); - - connInProgress--; - - return fut.get(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteSpiException("Thread has been interrupted.", e); - } - catch (ExecutionException e) { - throw new IgniteSpiException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() { - List<Runnable> unstartedTasks = executor.shutdownNow(); - - connInProgress -= unstartedTasks.size(); - - if (connInProgress > 0) { - Thread thread = new Thread(new Runnable() { - @Override public void run() { - try { - executor.awaitTermination(5, TimeUnit.MINUTES); - - Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut; - - while ((fut = completionSrvc.poll()) != null) { - try { - GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get(); - - if (tuple3 != null) - IgniteUtils.closeQuiet(tuple3.get2()); - } - catch (ExecutionException ignore) { - - } - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new RuntimeException(e); - } - } - }); - - thread.setDaemon(true); - - thread.start(); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java index df9d0f4..f338fab 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java @@ -272,4 +272,13 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { */ @MXBeanDescription("Dump debug info.") public void dumpDebugInfo(); + + /** + * Whether or not discovery is started in client mode. + * + * @return {@code true} if node is in client mode. + * @throws IllegalStateException If discovery SPI is not started. + */ + @MXBeanDescription("Client mode.") + public boolean isClientMode() throws IllegalStateException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index bb8f051..cc61c9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -448,11 +448,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste ClusterMetrics metrics = this.metrics; - if (metrics != null) { - mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE]; - - ClusterMetricsSnapshot.serialize(mtr, 0, metrics); - } + if (metrics != null) + mtr = ClusterMetricsSnapshot.serialize(metrics); U.writeByteArray(out, mtr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index e866504..e9eaa1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -32,7 +32,7 @@ import java.util.concurrent.locks.*; */ public class TcpDiscoveryNodesRing { /** Visible nodes filter. */ - private static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { + public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { @Override public boolean apply(TcpDiscoveryNode node) { return node.visible(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java index 51ad7b4..95758e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinder.java @@ -31,7 +31,7 @@ public interface TcpDiscoveryIpFinder { * method is completed, SPI context can be stored for future access. * * @param spiCtx Spi context. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + * @throws IgniteSpiException In case of error. */ public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException; @@ -46,7 +46,7 @@ public interface TcpDiscoveryIpFinder { * Initializes addresses discovery SPI binds to. * * @param addrs Addresses discovery SPI binds to. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + * @throws IgniteSpiException In case of error. */ public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; @@ -54,7 +54,7 @@ public interface TcpDiscoveryIpFinder { * Gets all addresses registered in this finder. * * @return All known addresses, potentially empty, but never {@code null}. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + * @throws IgniteSpiException In case of error. */ public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException; @@ -76,7 +76,7 @@ public interface TcpDiscoveryIpFinder { * is already registered. * * @param addrs Addresses to register. Not {@code null} and not empty. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + * @throws IgniteSpiException In case of error. */ public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; @@ -87,7 +87,7 @@ public interface TcpDiscoveryIpFinder { * registered quietly (just no-op). * * @param addrs Addresses to unregister. Not {@code null} and not empty. - * @throws org.apache.ignite.spi.IgniteSpiException In case of error. + * @throws IgniteSpiException In case of error. */ public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index 6cf06ab..a992620 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -26,6 +26,8 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.jetbrains.annotations.*; @@ -254,6 +256,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { "(it is recommended in production to specify at least one address in " + "TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); + boolean clientMode; + + if (ignite != null) { // Can be null if used in tests without starting Ignite. + DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi(); + + if (!(discoSpi instanceof TcpDiscoverySpi)) + throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " + + "TcpDiscoverySpi: " + discoSpi); + + clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode(); + } + else + clientMode = false; + InetAddress mcastAddr; try { @@ -296,7 +312,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { if (!addr.isLoopbackAddress()) { try { - addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); + if (!clientMode) + addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); reqItfs.add(addr); } @@ -309,20 +326,24 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { } } - if (addrSnds.isEmpty()) { - try { - // Create non-bound socket if local host is loopback or failed to create sockets explicitly - // bound to interfaces. - addrSnds.add(new AddressSender(mcastAddr, null, addrs)); - } - catch (IOException e) { - throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e); + if (!clientMode) { + if (addrSnds.isEmpty()) { + try { + // Create non-bound socket if local host is loopback or failed to create sockets explicitly + // bound to interfaces. + addrSnds.add(new AddressSender(mcastAddr, null, addrs)); + } + catch (IOException e) { + throw new IgniteSpiException("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ']', e); + } } - } - for (AddressSender addrSnd :addrSnds) - addrSnd.start(); + for (AddressSender addrSnd : addrSnds) + addrSnd.start(); + } + else + assert addrSnds.isEmpty() : addrSnds; Collection<InetSocketAddress> ret; @@ -495,11 +516,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { /** {@inheritDoc} */ @Override public void close() { - for (AddressSender addrSnd : addrSnds) - U.interrupt(addrSnd); + if (addrSnds != null) { + for (AddressSender addrSnd : addrSnds) + U.interrupt(addrSnd); - for (AddressSender addrSnd : addrSnds) - U.join(addrSnd, log); + for (AddressSender addrSnd : addrSnds) + U.join(addrSnd, log); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 1a00359..145b518 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -52,9 +52,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** Topology version. */ private long topVer; - /** Destination client node ID. */ - private UUID destClientNodeId; - /** Flags. */ @GridToStringExclude private int flags; @@ -178,20 +175,6 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { } /** - * @return Destination client node ID. - */ - public UUID destinationClientNodeId() { - return destClientNodeId; - } - - /** - * @param destClientNodeId Destination client node ID. - */ - public void destinationClientNodeId(UUID destClientNodeId) { - this.destClientNodeId = destClientNodeId; - } - - /** * @return Pending message index. */ public short pendingIndex() { @@ -232,6 +215,13 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { flags &= ~mask; } + /** + * @return {@code true} if message must be added to head of queue. + */ + public boolean highPriority() { + return false; + } + /** {@inheritDoc} */ @Override public final boolean equals(Object obj) { if (this == obj) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java new file mode 100644 index 0000000..95ac340 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Heartbeat message. + * <p> + * Client sends his hearbeats in this message. + */ +public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final byte[] metrics; + + /** + * Constructor. + * + * @param creatorNodeId Creator node. + */ + public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) { + super(creatorNodeId); + + this.metrics = ClusterMetricsSnapshot.serialize(metrics); + } + + /** + * Gets metrics map. + * + * @return Metrics map. + */ + public ClusterMetrics metrics() { + return ClusterMetricsSnapshot.deserialize(metrics, 0); + } + + /** {@inheritDoc} */ + @Override public boolean highPriority() { + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java new file mode 100644 index 0000000..f9f164d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ping request. + */ +public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Pinged client node ID. */ + private final UUID nodeToPing; + + /** + * @param creatorNodeId Creator node ID. + * @param nodeToPing Pinged client node ID. + */ + public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) { + super(creatorNodeId); + + this.nodeToPing = nodeToPing; + } + + /** + * @return Pinged client node ID. + */ + @Nullable public UUID nodeToPing() { + return nodeToPing; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java new file mode 100644 index 0000000..26a2b00 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ping request. + */ +public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Pinged client node ID. */ + private final UUID nodeToPing; + + /** */ + private final boolean res; + + /** + * @param creatorNodeId Creator node ID. + * @param nodeToPing Pinged client node ID. + */ + public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing, boolean res) { + super(creatorNodeId); + + this.nodeToPing = nodeToPing; + this.res = res; + } + + /** + * @return Pinged client node ID. + */ + @Nullable public UUID nodeToPing() { + return nodeToPing; + } + + /** + * @return Result of ping. + */ + public boolean result() { + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 4e42f2d..0739c1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -18,29 +18,34 @@ package org.apache.ignite.spi.discovery.tcp.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.spi.discovery.*; +import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** * Wrapped for custom message. */ +@TcpDiscoveryRedirectToClient @TcpDiscoveryEnsureDelivery public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private transient Serializable msg; + private transient volatile DiscoverySpiCustomMessage msg; /** */ - private final byte[] msgBytes; + private byte[] msgBytes; /** * @param creatorNodeId Creator node id. + * @param msg Message. * @param msgBytes Serialized message. */ - public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, Serializable msg, byte[] msgBytes) { + public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg, + @NotNull byte[] msgBytes) { super(creatorNodeId); this.msg = msg; @@ -48,17 +53,33 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage } /** - * @return Message. + * @return Serialized message. */ - public Serializable message() { - return msg; + public byte[] messageBytes() { + return msgBytes; } /** - * @return Serialized message. + * @param msg Message. + * @param msgBytes Serialized message. */ - public byte[] messageBytes() { - return msgBytes; + public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) { + this.msg = msg; + this.msgBytes = msgBytes; + } + + /** + * @return Deserialized message, + * @throws java.lang.Throwable if unmarshal failed. + */ + @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh) throws Throwable { + if (msg == null) { + msg = marsh.unmarshal(msgBytes, U.gridClassLoader()); + + assert msg != null; + } + + return msg; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index bafde9f..f721401 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -58,13 +58,6 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { private final Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics = new HashMap<>(); /** - * Public default no-arg constructor for {@link Externalizable} interface. - */ - public TcpDiscoveryHeartbeatMessage() { - // No-op. - } - - /** * Constructor. * * @param creatorNodeId Creator node. @@ -211,22 +204,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString()); + @Override public boolean highPriority() { + return true; } - /** - * @param metrics Metrics. - * @return Serialized metrics. - */ - private static byte[] serializeMetrics(ClusterMetrics metrics) { - assert metrics != null; - - byte[] buf = new byte[ClusterMetricsSnapshot.METRICS_SIZE]; - - ClusterMetricsSnapshot.serialize(buf, 0, metrics); - - return buf; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryHeartbeatMessage.class, this, "super", super.toString()); } /** @@ -273,7 +257,7 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { public MetricsSet(ClusterMetrics metrics) { assert metrics != null; - this.metrics = serializeMetrics(metrics); + this.metrics = ClusterMetricsSnapshot.serialize(metrics); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 5a71eb3..1d974e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -17,7 +17,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -34,6 +36,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess private final UUID nodeId; /** + * Client node can not get discovery data from TcpDiscoveryNodeAddedMessage, we have to pass discovery data in + * TcpDiscoveryNodeAddFinishedMessage + */ + @GridToStringExclude + private Map<UUID, Map<Integer, byte[]>> clientDiscoData; + + /** */ + @GridToStringExclude + private Map<String, Object> clientNodeAttrs; + + /** * Constructor. * * @param creatorNodeId ID of the creator node (coordinator). @@ -54,6 +67,36 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess return nodeId; } + /** + * @return Discovery data for joined client. + */ + public Map<UUID, Map<Integer, byte[]>> clientDiscoData() { + return clientDiscoData; + } + + /** + * @param clientDiscoData Discovery data for joined client. + */ + public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) { + this.clientDiscoData = clientDiscoData; + + assert clientDiscoData == null || !clientDiscoData.containsKey(nodeId); + } + + /** + * @return Client node attributes. + */ + public Map<String, Object> clientNodeAttributes() { + return clientNodeAttrs; + } + + /** + * @param clientNodeAttrs New client node attributes. + */ + public void clientNodeAttributes(Map<String, Object> clientNodeAttrs) { + this.clientNodeAttrs = clientNodeAttrs; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeAddFinishedMessage.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index a9303f3..2a14158 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -148,7 +148,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { * * @return Map with topology snapshots history. */ - @Nullable public Map<Long, Collection<ClusterNode>> topologyHistory() { + public Map<Long, Collection<ClusterNode>> topologyHistory() { return topHist; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java index de5b0a7..f17c91b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingRequest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.util.*; @@ -47,4 +48,9 @@ public class TcpDiscoveryPingRequest extends TcpDiscoveryAbstractMessage { @Nullable public UUID clientNodeId() { return clientNodeId; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryPingRequest.class, this, "super", super.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java index 6396764..02b2d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryPingResponse.java @@ -17,7 +17,8 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.*; +import org.apache.ignite.internal.util.typedef.internal.*; + import java.util.*; /** @@ -31,13 +32,6 @@ public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage { private boolean clientExists; /** - * For {@link Externalizable}. - */ - public TcpDiscoveryPingResponse() { - // No-op. - } - - /** * @param creatorNodeId Creator node ID. */ public TcpDiscoveryPingResponse(UUID creatorNodeId) { @@ -57,4 +51,9 @@ public class TcpDiscoveryPingResponse extends TcpDiscoveryAbstractMessage { public boolean clientExists() { return clientExists; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryPingResponse.class, this, "super", super.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java index e7db285..7a88426 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java @@ -387,15 +387,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, Space space = space(spaceName, false); - if (space == null) - return; - - byte[] val = space.remove(key, c != null); + byte[] val = space == null ? null : space.remove(key, c != null); if (c != null) c.apply(val); - notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName); + if (space != null) + notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java new file mode 100644 index 0000000..467349f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODE_CNT = 4; + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final String CACHE3 = "cache3"; + + /** */ + private static final String CACHE4 = "cache4"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + if (gridName.equals(getTestGridName(NODE_CNT - 1))) + cfg.setClientMode(true); + + CacheConfiguration ccfg1 = new CacheConfiguration(); + + ccfg1.setBackups(1); + ccfg1.setName(CACHE1); + ccfg1.setAffinity(new RendezvousAffinityFunction()); + ccfg1.setNodeFilter(new TestNodesFilter()); + + CacheConfiguration ccfg2 = new CacheConfiguration(); + + ccfg2.setBackups(1); + ccfg2.setName(CACHE2); + ccfg2.setAffinity(new RendezvousAffinityFunction()); + + CacheConfiguration ccfg3 = new CacheConfiguration(); + + ccfg3.setBackups(1); + ccfg3.setName(CACHE3); + ccfg3.setAffinity(new FairAffinityFunction()); + ccfg3.setNodeFilter(new TestNodesFilter()); + + CacheConfiguration ccfg4 = new CacheConfiguration(); + + ccfg4.setCacheMode(REPLICATED); + ccfg4.setName(CACHE4); + ccfg4.setNodeFilter(new TestNodesFilter()); + + cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODE_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeNotInAffinity() throws Exception { + checkCache(CACHE1, 2); + + checkCache(CACHE2, 2); + + checkCache(CACHE3, 2); + + checkCache(CACHE4, 3); + + Ignite client = ignite(NODE_CNT - 1); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setBackups(0); + + ccfg.setNodeFilter(new TestNodesFilter()); + + try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg)) { + checkCache(null, 1); + } + + try (IgniteCache<Integer, Integer> cache = client.createCache(ccfg, new NearCacheConfiguration())) { + checkCache(null, 1); + } + } + + /** + * @param cacheName Cache name. + * @param expNodes Expected number of nodes per partition. + */ + private void checkCache(String cacheName, int expNodes) { + log.info("Test cache: " + cacheName); + + Ignite client = ignite(NODE_CNT - 1); + + assertTrue(client.configuration().isClientMode()); + + ClusterNode clientNode = client.cluster().localNode(); + + for (int i = 0; i < NODE_CNT; i++) { + Ignite ignite = ignite(i); + + Affinity<Integer> aff = ignite.affinity(cacheName); + + for (int part = 0; part < aff.partitions(); part++) { + Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part); + + assertEquals(expNodes, nodes.size()); + + assertFalse(nodes.contains(clientNode)); + } + } + } + + /** + * + */ + private static class TestNodesFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + + assertNotNull(attr); + + assertFalse(attr); + + return true; + } + } +}
