http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 08a9937..733ae81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -176,9 +176,11 @@ public class GridNioRecoveryDescriptor { while (acked < rcvCnt) { GridNioFuture<?> fut = msgFuts.pollFirst(); - assert fut != null; + assert fut != null : "Missed message future [rcvCnt=" + rcvCnt + + ", acked=" + acked + + ", desc=" + this + ']'; - assert fut.isDone(); + assert fut.isDone() : fut; acked++; } @@ -239,9 +241,12 @@ public class GridNioRecoveryDescriptor { * @param rcvCnt Number of messages received by remote node. */ public void onHandshake(long rcvCnt) { - ackReceived(rcvCnt); + synchronized (this) { + if (!nodeLeft) + ackReceived(rcvCnt); - resendCnt = msgFuts.size(); + resendCnt = msgFuts.size(); + } } /**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java deleted file mode 100644 index 72c20f8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.nio; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.nio.*; -import java.util.*; -import java.util.concurrent.locks.*; - -/** - * Grid client for NIO server. - */ -public class GridTcpCommunicationClient extends GridAbstractCommunicationClient { - /** Socket. */ - private final Socket sock; - - /** Output stream. */ - private final UnsafeBufferedOutputStream out; - - /** Minimum buffered message count. */ - private final int minBufferedMsgCnt; - - /** Communication buffer size ratio. */ - private final double bufSizeRatio; - - /** */ - private final ByteBuffer writeBuf; - - /** */ - private final MessageFormatter formatter; - - /** - * @param metricsLsnr Metrics listener. - * @param addr Address. - * @param locHost Local address. - * @param connTimeout Connect timeout. - * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option. - * @param sockRcvBuf Socket receive buffer. - * @param sockSndBuf Socket send buffer. - * @param bufSize Buffer size (or {@code 0} to disable buffer). - * @param minBufferedMsgCnt Minimum buffered message count. - * @param bufSizeRatio Communication buffer size ratio. - * @param formatter Message formatter. - * @throws IgniteCheckedException If failed. - */ - public GridTcpCommunicationClient( - GridNioMetricsListener metricsLsnr, - InetSocketAddress addr, - InetAddress locHost, - long connTimeout, - boolean tcpNoDelay, - int sockRcvBuf, - int sockSndBuf, - int bufSize, - int minBufferedMsgCnt, - double bufSizeRatio, - MessageFormatter formatter - ) throws IgniteCheckedException { - super(metricsLsnr); - - assert metricsLsnr != null; - assert addr != null; - assert locHost != null; - assert connTimeout >= 0; - assert bufSize >= 0; - - A.ensure(minBufferedMsgCnt >= 0, - "Value of minBufferedMessageCount property cannot be less than zero."); - A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1, - "Value of bufSizeRatio property must be between 0 and 1 (exclusive)."); - - this.minBufferedMsgCnt = minBufferedMsgCnt; - this.bufSizeRatio = bufSizeRatio; - this.formatter = formatter; - - writeBuf = ByteBuffer.allocate(8 << 10); - - writeBuf.order(ByteOrder.nativeOrder()); - - sock = new Socket(); - - boolean success = false; - - try { - sock.bind(new InetSocketAddress(locHost, 0)); - - sock.setTcpNoDelay(tcpNoDelay); - - if (sockRcvBuf > 0) - sock.setReceiveBufferSize(sockRcvBuf); - - if (sockSndBuf > 0) - sock.setSendBufferSize(sockSndBuf); - - sock.connect(addr, (int)connTimeout); - - out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize); - - success = true; - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to connect to remote host " + - "[addr=" + addr + ", localHost=" + locHost + ']', e); - } - finally { - if (!success) - U.closeQuiet(sock); - } - } - - /** {@inheritDoc} */ - @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException { - try { - handshakeC.applyx(sock.getInputStream(), sock.getOutputStream()); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " + - sock.getRemoteSocketAddress(), e); - } - } - - /** {@inheritDoc} */ - @Override public boolean close() { - boolean res = super.close(); - - if (res) { - U.closeQuiet(out); - U.closeQuiet(sock); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public void forceClose() { - super.forceClose(); - - try { - out.flush(); - } - catch (IOException ignored) { - // No-op. - } - - // Do not call (directly or indirectly) out.close() here - // since it may cause a deadlock. - out.forceClose(); - - U.closeQuiet(sock); - } - - /** {@inheritDoc} */ - @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Client was closed: " + this); - - try { - out.write(data, 0, len); - - metricsLsnr.onBytesSent(len); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e); - } - - markUsed(); - } - - /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg) - throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Client was closed: " + this); - - assert writeBuf.hasArray(); - - try { - int cnt = U.writeMessageFully(msg, out, writeBuf, formatter.writer()); - - metricsLsnr.onBytesSent(cnt); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e); - } - - markUsed(); - - return false; - } - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - @Override public void flushIfNeeded(long timeout) throws IOException { - assert timeout > 0; - - out.flushOnTimeout(timeout); - } - - /** {@inheritDoc} */ - @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridTcpCommunicationClient.class, this, super.toString()); - } - - /** - * - */ - private class UnsafeBufferedOutputStream extends FilterOutputStream { - /** The internal buffer where data is stored. */ - private final byte buf[]; - - /** Current size. */ - private int size; - - /** Count. */ - private int cnt; - - /** Message count. */ - private int msgCnt; - - /** Total messages size. */ - private int totalCnt; - - /** Lock. */ - private final ReentrantLock lock = new ReentrantLock(); - - /** Last flushed timestamp. */ - private volatile long lastFlushed = U.currentTimeMillis(); - - /** Cached flush timeout. */ - private volatile long flushTimeout; - - /** Buffer adjusted timestamp. */ - private long lastAdjusted = U.currentTimeMillis(); - - /** - * Creates a new buffered output stream to write data to the - * specified underlying output stream. - * - * @param out The underlying output stream. - */ - UnsafeBufferedOutputStream(OutputStream out) { - this(out, 8192); - } - - /** - * Creates a new buffered output stream to write data to the - * specified underlying output stream with the specified buffer - * size. - * - * @param out The underlying output stream. - * @param size The buffer size. - */ - UnsafeBufferedOutputStream(OutputStream out, int size) { - super(out); - - assert size >= 0; - - this.size = size; - buf = size > 0 ? new byte[size] : null; - } - - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) throws IOException { - assert b != null; - assert off == 0; - - // No buffering. - if (buf == null) { - lock.lock(); - - try { - out.write(b, 0, len); - } - finally { - lock.unlock(); - } - - return; - } - - // Buffering is enabled. - lock.lock(); - - try { - msgCnt++; - totalCnt += len; - - if (len >= size) { - flushLocked(); - - out.write(b, 0, len); - - lastFlushed = U.currentTimeMillis(); - - adjustBufferIfNeeded(); - - return; - } - - if (cnt + len > size) { - flushLocked(); - - messageToBuffer0(b, off, len, buf, 0); - - cnt = len; - - assert cnt < size; - - adjustBufferIfNeeded(); - - return; - } - - messageToBuffer0(b, 0, len, buf, cnt); - - cnt += len; - - if (cnt == size) - flushLocked(); - else - flushIfNeeded(); - } - finally { - lock.unlock(); - } - } - - /** - * @throws IOException If failed. - */ - private void flushIfNeeded() throws IOException { - assert lock.isHeldByCurrentThread(); - assert buf != null; - - long flushTimeout0 = flushTimeout; - - if (flushTimeout0 > 0) - flushOnTimeoutLocked(flushTimeout0); - } - - /** - * - */ - private void adjustBufferIfNeeded() { - assert lock.isHeldByCurrentThread(); - assert buf != null; - - long flushTimeout0 = flushTimeout; - - if (flushTimeout0 > 0) - adjustBufferLocked(flushTimeout0); - } - - /** {@inheritDoc} */ - @Override public void flush() throws IOException { - lock.lock(); - - try { - flushLocked(); - } - finally { - lock.unlock(); - } - } - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - public void flushOnTimeout(long timeout) throws IOException { - assert buf != null; - assert timeout > 0; - - // Overwrite cached value. - flushTimeout = timeout; - - if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock()) - return; - - try { - flushOnTimeoutLocked(timeout); - } - finally { - lock.unlock(); - } - } - - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - private void flushOnTimeoutLocked(long timeout) throws IOException { - assert lock.isHeldByCurrentThread(); - assert timeout > 0; - - // Double check. - if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis()) - return; - - flushLocked(); - - adjustBufferLocked(timeout); - } - - /** - * @param timeout Timeout. - */ - private void adjustBufferLocked(long timeout) { - assert lock.isHeldByCurrentThread(); - assert timeout > 0; - - long time = U.currentTimeMillis(); - - if (lastAdjusted + timeout < time) { - if (msgCnt <= minBufferedMsgCnt) - size = 0; - else { - size = (int)(totalCnt * bufSizeRatio); - - if (size > buf.length) - size = buf.length; - } - - msgCnt = 0; - totalCnt = 0; - - lastAdjusted = time; - } - } - - /** - * @throws IOException If failed. - */ - private void flushLocked() throws IOException { - assert lock.isHeldByCurrentThread(); - - if (buf != null && cnt > 0) { - out.write(buf, 0, cnt); - - cnt = 0; - } - - out.flush(); - - lastFlushed = U.currentTimeMillis(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - lock.lock(); - - try { - flushLocked(); - } - finally { - try { - out.close(); - } - finally { - lock.unlock(); - } - } - } - - /** - * Forcibly closes underlying stream ignoring any possible exception. - */ - public void forceClose() { - try { - out.close(); - } - catch (IOException ignored) { - // No-op. - } - } - - /** - * @param b Buffer to copy from. - * @param off Offset in source buffer. - * @param len Length. - * @param resBuf Result buffer. - * @param resOff Result offset. - */ - private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) { - assert b.length == len; - assert off == 0; - assert resBuf.length >= resOff + len + 4; - - U.intToBytes(len, resBuf, resOff); - - U.arrayCopy(b, off, resBuf, resOff + 4, len); - } - - /** - * @param b Buffer to copy from (length included). - * @param off Offset in source buffer. - * @param len Length. - * @param resBuf Result buffer. - * @param resOff Result offset. - */ - private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) { - assert off == 0; - assert resBuf.length >= resOff + len; - - U.arrayCopy(b, off, resBuf, resOff, len); - } - - /** {@inheritDoc} */ - @Override public String toString() { - lock.lock(); - - try { - return S.toString(UnsafeBufferedOutputStream.class, this); - } - finally { - lock.unlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 788a8e6..abad875 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -122,14 +122,6 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie return false; } - /** - * @param timeout Timeout. - * @throws IOException If failed. - */ - @Override public void flushIfNeeded(long timeout) throws IOException { - // No-op. - } - /** {@inheritDoc} */ @Override public boolean async() { return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java index bd24ecf..9e15d2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java @@ -202,7 +202,7 @@ public class VisorCache implements Serializable { offHeapAllocatedSize = ca.offHeapAllocatedSize(); offHeapEntriesCnt = ca.offHeapEntriesCount(); partitions = ca.affinity().partitions(); - metrics = VisorCacheMetrics.from(ignite, ca); + metrics = VisorCacheMetrics.from(ignite, cacheName); estimateMemorySize(ignite, ca, sample); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java index ef12424..c8913c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfigurationCollectorJob.java @@ -45,17 +45,17 @@ public class VisorCacheConfigurationCollectorJob /** {@inheritDoc} */ @Override protected Map<IgniteUuid, VisorCacheConfiguration> run(Collection<IgniteUuid> arg) { - Collection<GridCacheAdapter<?, ?>> caches = ignite.context().cache().internalCaches(); + Collection<IgniteCacheProxy<?, ?>> caches = ignite.context().cache().jcaches(); boolean all = arg == null || arg.isEmpty(); Map<IgniteUuid, VisorCacheConfiguration> res = U.newHashMap(caches.size()); - for (GridCacheAdapter<?, ?> cache : caches) { + for (IgniteCacheProxy<?, ?> cache : caches) { IgniteUuid deploymentId = cache.context().dynamicDeploymentId(); if (all || arg.contains(deploymentId)) - res.put(deploymentId, config(cache.configuration())); + res.put(deploymentId, config(cache.getConfiguration(CacheConfiguration.class))); } return res; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java index 30be424..c5d70a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.visor.cache; +import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; @@ -166,19 +167,21 @@ public class VisorCacheMetrics implements Serializable { /** * @param ignite Ignite. - * @param c Cache. + * @param cacheName Cache name. * @return Data transfer object for given cache metrics. */ - public static VisorCacheMetrics from(IgniteEx ignite, IgniteInternalCache c) { + public static VisorCacheMetrics from(IgniteEx ignite, String cacheName) { VisorCacheMetrics cm = new VisorCacheMetrics(); - CacheMetrics m = c.metrics(); - GridCacheProcessor cacheProcessor = ignite.context().cache(); - cm.name = c.name(); - cm.mode = cacheProcessor.cacheMode(c.name()); - cm.sys = cacheProcessor.systemCache(c.name()); + IgniteCache<Object, Object> c = cacheProcessor.jcache(cacheName); + + cm.name = cacheName; + cm.mode = cacheProcessor.cacheMode(cacheName); + cm.sys = cacheProcessor.systemCache(cacheName); + + CacheMetrics m = c.metrics(); cm.size = m.getSize(); cm.keySize = m.getKeySize(); @@ -208,7 +211,7 @@ public class VisorCacheMetrics implements Serializable { cm.commitsPerSec = perSecond(m.getAverageTxCommitTime()); cm.rollbacksPerSec = perSecond(m.getAverageTxRollbackTime()); - cm.qryMetrics = VisorCacheQueryMetrics.from(c.context().queries().metrics()); + cm.qryMetrics = VisorCacheQueryMetrics.from(c.queryMetrics()); cm.dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize(); cm.txThreadMapSize = m.getTxThreadMapSize(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java index 8fd42a0..23263c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetricsCollectorTask.java @@ -99,17 +99,19 @@ public class VisorCacheMetricsCollectorTask extends VisorMultiNodeTask<IgniteBiT GridCacheProcessor cacheProcessor = ignite.context().cache(); - Collection<GridCacheAdapter<?, ?>> caches = cacheProcessor.internalCaches(); + Collection<IgniteCacheProxy<?, ?>> caches = cacheProcessor.jcaches(); Collection<VisorCacheMetrics> res = new ArrayList<>(caches.size()); boolean allCaches = cacheNames.isEmpty(); - for (GridCacheAdapter ca : caches) { + for (IgniteCacheProxy ca : caches) { if (ca.context().started()) { - VisorCacheMetrics cm = VisorCacheMetrics.from(ignite, ca); + String cacheName = ca.getName(); - if ((allCaches || cacheNames.contains(ca.name())) && (showSysCaches || !cm.system())) + VisorCacheMetrics cm = VisorCacheMetrics.from(ignite, cacheName); + + if ((allCaches || cacheNames.contains(cacheName)) && (showSysCaches || !cm.system())) res.add(cm); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java index 06dbfbf..ab24a3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStoreConfiguration.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.visor.cache; -import org.apache.ignite.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.jdbc.*; import org.apache.ignite.configuration.*; @@ -72,10 +71,10 @@ public class VisorCacheStoreConfiguration implements Serializable { * @param ccfg Cache configuration. * @return Data transfer object for cache store configuration properties. */ - public static VisorCacheStoreConfiguration from(Ignite ignite, CacheConfiguration ccfg) { + public static VisorCacheStoreConfiguration from(IgniteEx ignite, CacheConfiguration ccfg) { VisorCacheStoreConfiguration cfg = new VisorCacheStoreConfiguration(); - GridCacheAdapter<Object, Object> c = ((IgniteKernal)ignite).internalCache(ccfg.getName()); + IgniteCacheProxy<Object, Object> c = ignite.context().cache().jcache(ccfg.getName()); CacheStore store = c != null && c.context().started() ? c.context().store().configuredStore() : null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java index fde871b..3b2d45c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java @@ -23,6 +23,7 @@ import org.apache.ignite.compute.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.*; +import org.apache.ignite.internal.visor.util.*; import org.jetbrains.annotations.*; import java.util.*; @@ -87,7 +88,7 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData else { // Ignore nodes that left topology. if (!(unhandledEx instanceof ClusterGroupEmptyException)) - taskRes.unhandledEx().put(nid, unhandledEx); + taskRes.unhandledEx().put(nid, new VisorExceptionWrapper(unhandledEx)); } } } @@ -116,13 +117,13 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData taskRes.events().addAll(jobRes.events()); if (jobRes.eventsEx() != null) - taskRes.eventsEx().put(nid, jobRes.eventsEx()); + taskRes.eventsEx().put(nid, new VisorExceptionWrapper(jobRes.eventsEx())); if (!jobRes.caches().isEmpty()) taskRes.caches().put(nid, jobRes.caches()); if (jobRes.cachesEx() != null) - taskRes.cachesEx().put(nid, jobRes.cachesEx()); + taskRes.cachesEx().put(nid, new VisorExceptionWrapper(jobRes.cachesEx())); if (!jobRes.igfss().isEmpty()) taskRes.igfss().put(nid, jobRes.igfss()); @@ -131,6 +132,6 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData taskRes.igfsEndpoints().put(nid, jobRes.igfsEndpoints()); if (jobRes.igfssEx() != null) - taskRes.igfssEx().put(nid, jobRes.igfssEx()); + taskRes.igfssEx().put(nid, new VisorExceptionWrapper(jobRes.igfssEx())); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java index 6485978..1a4eb02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.visor.node; import org.apache.ignite.internal.visor.cache.*; import org.apache.ignite.internal.visor.event.*; import org.apache.ignite.internal.visor.igfs.*; +import org.apache.ignite.internal.visor.util.*; import java.io.*; import java.util.*; @@ -32,7 +33,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { private static final long serialVersionUID = 0L; /** Unhandled exceptions from nodes. */ - private final Map<UUID, Throwable> unhandledEx = new HashMap<>(); + private final Map<UUID, VisorExceptionWrapper> unhandledEx = new HashMap<>(); /** Nodes grid names. */ private final Map<UUID, String> gridNames = new HashMap<>(); @@ -50,13 +51,13 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { private final List<VisorGridEvent> evts = new ArrayList<>(); /** Exceptions caught during collecting events from nodes. */ - private final Map<UUID, Throwable> evtsEx = new HashMap<>(); + private final Map<UUID, VisorExceptionWrapper> evtsEx = new HashMap<>(); /** All caches collected from nodes. */ private final Map<UUID, Collection<VisorCache>> caches = new HashMap<>(); /** Exceptions caught during collecting caches from nodes. */ - private final Map<UUID, Throwable> cachesEx = new HashMap<>(); + private final Map<UUID, VisorExceptionWrapper> cachesEx = new HashMap<>(); /** All IGFS collected from nodes. */ private final Map<UUID, Collection<VisorIgfs>> igfss = new HashMap<>(); @@ -65,7 +66,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { private final Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints = new HashMap<>(); /** Exceptions caught during collecting IGFS from nodes. */ - private final Map<UUID, Throwable> igfssEx = new HashMap<>(); + private final Map<UUID, VisorExceptionWrapper> igfssEx = new HashMap<>(); /** * @return {@code true} If no data was collected. @@ -88,7 +89,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { /** * @return Unhandled exceptions from nodes. */ - public Map<UUID, Throwable> unhandledEx() { + public Map<UUID, VisorExceptionWrapper> unhandledEx() { return unhandledEx; } @@ -123,7 +124,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { /** * @return Exceptions caught during collecting events from nodes. */ - public Map<UUID, Throwable> eventsEx() { + public Map<UUID, VisorExceptionWrapper> eventsEx() { return evtsEx; } @@ -137,7 +138,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { /** * @return Exceptions caught during collecting caches from nodes. */ - public Map<UUID, Throwable> cachesEx() { + public Map<UUID, VisorExceptionWrapper> cachesEx() { return cachesEx; } @@ -158,7 +159,7 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { /** * @return Exceptions caught during collecting IGFS from nodes. */ - public Map<UUID, Throwable> igfssEx() { + public Map<UUID, VisorExceptionWrapper> igfssEx() { return igfssEx; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java index 8b39d09..9fc1cc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeSuppressedErrorsTask.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.*; +import org.apache.ignite.internal.visor.util.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -83,12 +84,21 @@ public class VisorNodeSuppressedErrorsTask extends VisorMultiNodeTask<Map<UUID, List<IgniteExceptionRegistry.ExceptionInfo> errors = ignite.context().exceptionRegistry().getErrors(order); + List<IgniteExceptionRegistry.ExceptionInfo> wrapped = new ArrayList<>(errors.size()); + for (IgniteExceptionRegistry.ExceptionInfo error : errors) { if (error.order() > order) order = error.order(); + + wrapped.add(new IgniteExceptionRegistry.ExceptionInfo(error.order(), + new VisorExceptionWrapper(error.error()), + error.message(), + error.threadId(), + error.threadName(), + error.time())); } - return new IgniteBiTuple<>(order, errors); + return new IgniteBiTuple<>(order, wrapped); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java index 4a9daad..e977d2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.*; +import org.apache.ignite.internal.visor.util.*; import org.apache.ignite.lang.*; import javax.cache.*; @@ -36,7 +37,7 @@ import static org.apache.ignite.internal.visor.query.VisorQueryUtils.*; /** * Job for execute SCAN or SQL query and get first page of results. */ -public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? extends Exception, VisorQueryResultEx>> { +public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? extends VisorExceptionWrapper, VisorQueryResultEx>> { /** */ private static final long serialVersionUID = 0L; @@ -61,11 +62,11 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten } /** {@inheritDoc} */ - @Override protected IgniteBiTuple<? extends Exception, VisorQueryResultEx> run(VisorQueryArg arg) { + @Override protected IgniteBiTuple<? extends VisorExceptionWrapper, VisorQueryResultEx> run(VisorQueryArg arg) { try { UUID nid = ignite.localNode().id(); - boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN"); + boolean scan = arg.queryTxt() == null; String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" + UUID.randomUUID(); @@ -110,8 +111,8 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten Collection<GridQueryFieldMetadata> meta = cur.fieldsMeta(); if (meta == null) - return new IgniteBiTuple<Exception, VisorQueryResultEx>( - new SQLException("Fail to execute query. No metadata available."), null); + return new IgniteBiTuple<>( + new VisorExceptionWrapper(new SQLException("Fail to execute query. No metadata available.")), null); else { List<VisorQueryField> names = new ArrayList<>(meta.size()); @@ -138,7 +139,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten } } catch (Exception e) { - return new IgniteBiTuple<>(e, null); + return new IgniteBiTuple<>(VisorTaskUtils.wrap(e), null); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java index 4f2fda5..98c876a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java @@ -19,13 +19,14 @@ package org.apache.ignite.internal.visor.query; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.visor.*; +import org.apache.ignite.internal.visor.util.*; import org.apache.ignite.lang.*; /** * Task for execute SCAN or SQL query and get first page of results. */ @GridInternal -public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTuple<? extends Exception, VisorQueryResultEx>> { +public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTuple<? extends VisorExceptionWrapper, VisorQueryResultEx>> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java new file mode 100644 index 0000000..d2ae0e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorExceptionWrapper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.util; + +/** + * Exception wrapper for safe for transferring to Visor. + */ +public class VisorExceptionWrapper extends Throwable { + /** */ + private static final long serialVersionUID = 0L; + + /** Detail message string of this throwable */ + private String detailMsg; + + /** Simple class name of base throwable object. */ + private String clsSimpleName; + + /** Class name of base throwable object. */ + private String clsName; + + /** + * Wrap throwable by presented on Visor throwable object. + * + * @param cause Base throwable object. + */ + public VisorExceptionWrapper(Throwable cause) { + assert cause != null; + + clsSimpleName = cause.getClass().getSimpleName(); + clsName = cause.getClass().getName(); + + detailMsg = cause.getMessage(); + + StackTraceElement[] stackTrace = cause.getStackTrace(); + + if (stackTrace != null) + setStackTrace(stackTrace); + + if (cause.getCause() != null) + initCause(new VisorExceptionWrapper(cause.getCause())); + } + + /** + * @return Class simple name of base throwable object. + */ + public String getClassSimpleName() { + return clsSimpleName; + } + + /** + * @return Class name of base throwable object. + */ + public String getClassName() { + return clsName; + } + + /** {@inheritDoc} */ + @Override public String getMessage() { + return detailMsg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return (detailMsg != null) ? (clsName + ": " + detailMsg) : clsName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java index e8ae76d..b0afbc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java @@ -867,4 +867,14 @@ public class VisorTaskUtils { return bos.toByteArray(); } + + /** + * Wrap throwable object of any type to presented on Visor throwable object. + * + * @param e Base throwable object. + * @return Wrapped throwable object. + */ + public static VisorExceptionWrapper wrap(Throwable e) { + return new VisorExceptionWrapper(e); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java index 2ad07b5..5cdc72f 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java @@ -100,14 +100,94 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean, public long getOverflowSize(); /** {@inheritDoc} */ + @MXBeanDescription("Number of gets from off-heap memory.") + public long getOffHeapGets(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of puts to off-heap memory.") + public long getOffHeapPuts(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of removed entries from off-heap memory.") + public long getOffHeapRemovals(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of evictions from off-heap memory.") + public long getOffHeapEvictions(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of hits on off-heap memory.") + public long getOffHeapHits(); + + /** {@inheritDoc} */ + @MXBeanDescription("Percentage of hits on off-heap memory.") + public float getOffHeapHitPercentage(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of misses on off-heap memory.") + public long getOffHeapMisses(); + + /** {@inheritDoc} */ + @MXBeanDescription("Percentage of misses on off-heap memory.") + public float getOffHeapMissPercentage(); + + /** {@inheritDoc} */ @MXBeanDescription("Number of entries stored in off-heap memory.") public long getOffHeapEntriesCount(); /** {@inheritDoc} */ + @MXBeanDescription("Number of primary entries stored in off-heap memory.") + public long getOffHeapPrimaryEntriesCount(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of backup stored in off-heap memory.") + public long getOffHeapBackupEntriesCount(); + + /** {@inheritDoc} */ @MXBeanDescription("Memory size allocated in off-heap.") public long getOffHeapAllocatedSize(); /** {@inheritDoc} */ + @MXBeanDescription("Off-heap memory maximum size.") + public long getOffHeapMaxSize(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of gets from swap.") + public long getSwapGets(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of puts to swap.") + public long getSwapPuts(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of removed entries from swap.") + public long getSwapRemovals(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of hits on swap.") + public long getSwapHits(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of misses on swap.") + public long getSwapMisses(); + + /** {@inheritDoc} */ + @MXBeanDescription("Percentage of hits on swap.") + public float getSwapHitPercentage(); + + /** {@inheritDoc} */ + @MXBeanDescription("Percentage of misses on swap.") + public float getSwapMissPercentage(); + + /** {@inheritDoc} */ + @MXBeanDescription("Number of entries stored in swap.") + public long getSwapEntriesCount(); + + /** {@inheritDoc} */ + @MXBeanDescription("Size of swap.") + public long getSwapSize(); + + /** {@inheritDoc} */ @MXBeanDescription("Number of non-null values in the cache.") public int getSize(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java index 17bbc36..f064fde 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/PluginProvider.java @@ -19,13 +19,22 @@ package org.apache.ignite.plugin; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.jetbrains.annotations.*; import java.io.*; import java.util.*; /** - * Pluggable ignite component. + * Pluggable Ignite component. + * <p> + * Ignite plugins are loaded using JDK {@link ServiceLoader}. + * First method called to initialize plugin is {@link PluginProvider#initExtensions(PluginContext, ExtensionRegistry)}. + * If plugin requires configuration it can be set in {@link IgniteConfiguration} using + * {@link IgniteConfiguration#setPluginConfigurations(PluginConfiguration...)}. + * + * @see IgniteConfiguration#setPluginConfigurations(PluginConfiguration...) + * @see PluginContext */ public interface PluginProvider<C extends PluginConfiguration> { /** @@ -49,18 +58,21 @@ public interface PluginProvider<C extends PluginConfiguration> { public <T extends IgnitePlugin> T plugin(); /** + * Registers extensions. + * * @param ctx Plugin context. - * @param cls Ignite component class. - * @return Ignite component or {@code null} if component is not supported. + * @param registry Extension registry. */ - @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls); + public void initExtensions(PluginContext ctx, ExtensionRegistry registry); /** - * Register extensions. + * Creates Ignite component. + * * @param ctx Plugin context. - * @param registry Extension registry. + * @param cls Ignite component class. + * @return Ignite component or {@code null} if component is not supported. */ - public void initExtensions(PluginContext ctx, ExtensionRegistry registry); + @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls); /** * Starts grid component. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 871512c..6e7a706 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -23,13 +23,14 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.resources.*; -import org.apache.ignite.spi.swapspace.*; + import org.jetbrains.annotations.*; import javax.management.*; @@ -197,7 +198,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement * Inject ignite instance. */ @IgniteInstanceResource - protected void injectResources(Ignite ignite){ + protected void injectResources(Ignite ignite) { this.ignite = ignite; if (ignite != null) { @@ -453,19 +454,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement boolean isSpiConsistent = false; - String tipStr = " (fix configuration or set " + "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)"; + String tipStr = " (fix configuration or set " + + "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)"; if (rmtCls == null) { if (!optional && starting) - throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr + " [name=" + name + - ", loc=" + locCls + ']'); + throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr + + " [name=" + name + ", loc=" + locCls + ']'); sb.a(format(">>> Remote SPI with the same name is not configured: " + name, locCls)); } else if (!locCls.equals(rmtCls)) { if (!optional && starting) - throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr + " [name=" + name + - ", loc=" + locCls + ", rmt=" + rmtCls + ']'); + throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr + + " [name=" + name + ", loc=" + locCls + ", rmt=" + rmtCls + ']'); sb.a(format(">>> Remote SPI with the same name is of different type: " + name, locCls, rmtCls)); } @@ -542,9 +544,25 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** + * @param obj Timeout object. + * @see IgniteSpiContext#addTimeoutObject(IgniteSpiTimeoutObject) + */ + protected void addTimeoutObject(IgniteSpiTimeoutObject obj) { + spiCtx.addTimeoutObject(obj); + } + + /** + * @param obj Timeout object. + * @see IgniteSpiContext#removeTimeoutObject(IgniteSpiTimeoutObject) + */ + protected void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + spiCtx.removeTimeoutObject(obj); + } + + /** * Temporarily SPI context. */ - private static class GridDummySpiContext implements IgniteSpiContext { + private class GridDummySpiContext implements IgniteSpiContext { /** */ private final ClusterNode locNode; @@ -627,27 +645,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** {@inheritDoc} */ - @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val, - @Nullable ClassLoader ldr) { - /* No-op. */ - } - - /** {@inheritDoc} */ - @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) { - return null; - } - - /** {@inheritDoc} */ @Override public int partition(String cacheName, Object key) { return -1; } /** {@inheritDoc} */ - @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) { - // No-op. - } - - /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes() { return locNode == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode); } @@ -713,12 +715,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** {@inheritDoc} */ - @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key, - @Nullable ClassLoader ldr) { - return null; - } - - /** {@inheritDoc} */ @Override public MessageFormatter messageFormatter() { return msgFormatter; } @@ -737,5 +733,19 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement @Override public boolean tryFailNode(UUID nodeId) { return false; } + + /** {@inheritDoc} */ + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + assert ignite instanceof IgniteKernal : ignite; + + ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj)); + } + + /** {@inheritDoc} */ + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + assert ignite instanceof IgniteKernal : ignite; + + ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 6852b6d..f83326c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; -import org.apache.ignite.spi.swapspace.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -253,30 +252,6 @@ public interface IgniteSpiContext { public <K> boolean containsKey(String cacheName, K key); /** - * Writes object to swap. - * - * @param spaceName Swap space name. - * @param key Key. - * @param val Value. - * @param ldr Class loader (optional). - * @throws IgniteException If any exception occurs. - */ - public void writeToSwap(String spaceName, Object key, @Nullable Object val, @Nullable ClassLoader ldr) - throws IgniteException; - - /** - * Reads object from swap. - * - * @param spaceName Swap space name. - * @param key Key. - * @param ldr Class loader (optional). - * @return Swapped value. - * @throws IgniteException If any exception occurs. - */ - @Nullable public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) - throws IgniteException; - - /** * Calculates partition number for given key. * * @param cacheName Cache name. @@ -286,16 +261,6 @@ public interface IgniteSpiContext { public int partition(String cacheName, Object key); /** - * Removes object from swap. - * - * @param spaceName Swap space name. - * @param key Key. - * @param ldr Class loader (optional). - * @throws IgniteException If any exception occurs. - */ - public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) throws IgniteException; - - /** * Validates that new node can join grid topology, this method is called on coordinator * node before new node joins topology. * @@ -322,18 +287,6 @@ public interface IgniteSpiContext { public SecuritySubject authenticatedSubject(UUID subjId) throws IgniteException; /** - * Reads swapped cache value from off-heap and swap. - * - * @param spaceName Off-heap space name. - * @param key Key. - * @param ldr Class loader for unmarshalling. - * @return Value. - * @throws IgniteException If any exception occurs. - */ - @Nullable public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key, - @Nullable ClassLoader ldr) throws IgniteException; - - /** * Gets message formatter. * * @return Message formatter. @@ -357,4 +310,14 @@ public interface IgniteSpiContext { * @return If node was failed. */ public boolean tryFailNode(UUID nodeId); + + /** + * @param c Timeout object. + */ + public void addTimeoutObject(IgniteSpiTimeoutObject c); + + /** + * @param c Timeout object. + */ + public void removeTimeoutObject(IgniteSpiTimeoutObject c); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java new file mode 100644 index 0000000..b3fc28e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi; + +import org.apache.ignite.lang.*; + +/** + * Provides possibility to schedule delayed execution, + * see {@link IgniteSpiContext#addTimeoutObject(IgniteSpiTimeoutObject)}. + * <p> + * Note: all timeout objects are executed in single dedicated thread, so implementation + * of {@link #onTimeout()} should not use time consuming and blocking method. + */ +public interface IgniteSpiTimeoutObject { + /** + * @return Unique object ID. + */ + public IgniteUuid id(); + + /** + * @return End time. + */ + public long endTime(); + + /** + * Timeout callback. + */ + public void onTimeout(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java index 460cff3..832d872 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java @@ -51,8 +51,7 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi } /** {@inheritDoc} */ - @Override - public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) throws IgniteSpiException { + @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index fd17791..359de1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default idle connection timeout (value is <tt>30000</tt>ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; - /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */ - public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100; - - /** Default value for connection buffer size (value is <tt>0</tt>). */ - public static final int DFLT_CONN_BUF_SIZE = 0; - /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; @@ -267,7 +261,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Session was closed but there are unacknowledged messages, " + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - recoveryWorker.addReconnectRequest(recoveryData); + commWorker.addReconnectRequest(recoveryData); } } else @@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Idle connection timeout. */ private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT; - /** Connection buffer flush frequency. */ - private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ; - - /** Connection buffer size. */ - @SuppressWarnings("RedundantFieldInitialization") - private int connBufSize = DFLT_CONN_BUF_SIZE; - /** Connect timeout. */ private long connTimeout = DFLT_CONN_TIMEOUT; @@ -647,17 +634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Socket write timeout. */ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; - /** Idle client worker. */ - private IdleClientWorker idleClientWorker; - - /** Flush client worker. */ - private ClientFlushWorker clientFlushWorker; - - /** Socket timeout worker. */ - private SocketTimeoutWorker sockTimeoutWorker; - - /** Recovery worker. */ - private RecoveryWorker recoveryWorker; + /** Recovery and idle clients handler. */ + private CommunicationWorker commWorker; /** Clients. */ private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); @@ -882,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. - * <p> - * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}. * * @param connBufSize Connection buffer size. * @see #setConnectionBufferFlushFrequency(long) */ @IgniteSpiConfiguration(optional = true) public void setConnectionBufferSize(int connBufSize) { - this.connBufSize = connBufSize; + // No-op. } /** {@inheritDoc} */ @Override public int getConnectionBufferSize() { - return connBufSize; + return 0; } /** {@inheritDoc} */ @IgniteSpiConfiguration(optional = true) @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { - this.connBufFlushFreq = connBufFlushFreq; + // No-op. } /** {@inheritDoc} */ @Override public long getConnectionBufferFlushFrequency() { - return connBufFlushFreq; + return 0; } /** @@ -1174,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0"); - assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0"); - assertParameter(connBufSize >= 0, "connBufSize >= 0"); assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0"); assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0"); assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); @@ -1245,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("idleConnTimeout", idleConnTimeout)); log.debug(configInfo("directBuf", directBuf)); log.debug(configInfo("directSendBuf", directSndBuf)); - log.debug(configInfo("connBufSize", connBufSize)); - log.debug(configInfo("connBufFlushFreq", connBufFlushFreq)); log.debug(configInfo("selectorsCnt", selectorsCnt)); log.debug(configInfo("tcpNoDelay", tcpNoDelay)); log.debug(configInfo("sockSndBuf", sockSndBuf)); @@ -1261,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize)); } - if (connBufSize > 8192) - U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " + - "intentionally) [specified=" + connBufSize + ", recommended=8192]", - "Specified communication IO buffer size is larger than recommended (ignore if done intentionally)."); - if (!tcpNoDelay) U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " + "since may produce significant delays with some scenarios."); @@ -1274,23 +1241,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.start(); - idleClientWorker = new IdleClientWorker(); + commWorker = new CommunicationWorker(); - idleClientWorker.start(); - - recoveryWorker = new RecoveryWorker(); - - recoveryWorker.start(); - - if (connBufSize > 0) { - clientFlushWorker = new ClientFlushWorker(); - - clientFlushWorker.start(); - } - - sockTimeoutWorker = new SocketTimeoutWorker(); - - sockTimeoutWorker.start(); + commWorker.start(); // Ack start. if (log.isDebugEnabled()) @@ -1445,15 +1398,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(idleClientWorker); - U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); + U.interrupt(commWorker); - U.join(idleClientWorker, log); - U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); + U.join(commWorker, log); // Force closing on stop (safety). for (GridCommunicationClient client : clients.values()) @@ -1461,7 +1408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Clear resources. nioSrvr = null; - idleClientWorker = null; + commWorker = null; boundTcpPort = -1; @@ -1899,7 +1846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ) throws IgniteCheckedException { HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); - sockTimeoutWorker.addTimeoutObject(obj); + addTimeoutObject(obj); long rcvCnt = 0; @@ -2005,7 +1952,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter boolean cancelled = obj.cancel(); if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); + removeTimeoutObject(obj); // Ignoring whatever happened after timeout - reporting only timeout event. if (!cancelled) @@ -2041,15 +1988,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(idleClientWorker); - U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); + U.interrupt(commWorker); - U.join(idleClientWorker, log); - U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); + U.join(commWorker, log); for (GridCommunicationClient client : clients.values()) client.forceClose(); @@ -2156,80 +2097,95 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private class IdleClientWorker extends IgniteSpiThread { + private class CommunicationWorker extends IgniteSpiThread { + /** */ + private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); + /** * */ - IdleClientWorker() { - super(gridName, "nio-idle-client-collector", log); + private CommunicationWorker() { + super(gridName, "tcp-comm-worker", log); } /** {@inheritDoc} */ - @SuppressWarnings({"BusyWait"}) @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Tcp communication worker has been started."); + while (!isInterrupted()) { - cleanupRecovery(); + GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); - for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { - UUID nodeId = e.getKey(); + if (recoveryDesc != null) + processRecovery(recoveryDesc); + else + processIdle(); + } + } - GridCommunicationClient client = e.getValue(); + /** + * + */ + private void processIdle() { + cleanupRecovery(); - ClusterNode node = getSpiContext().node(nodeId); + for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { + UUID nodeId = e.getKey(); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Forcing close of non-existent node connection: " + nodeId); + GridCommunicationClient client = e.getValue(); - client.forceClose(); + ClusterNode node = getSpiContext().node(nodeId); - clients.remove(nodeId, client); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Forcing close of non-existent node connection: " + nodeId); - continue; - } + client.forceClose(); - GridNioRecoveryDescriptor recovery = null; + clients.remove(nodeId, client); - if (client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + continue; + } - if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { - RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + GridNioRecoveryDescriptor recovery = null; - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + - ", rcvCnt=" + msg.received() + ']'); + if (client instanceof GridTcpNioCommunicationClient) { + recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); - recovery.lastAcknowledged(msg.received()); + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + + ", rcvCnt=" + msg.received() + ']'); - continue; - } - } + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - long idleTime = client.getIdleTime(); + recovery.lastAcknowledged(msg.received()); - if (idleTime >= idleConnTimeout) { - if (recovery != null && - recovery.nodeAlive(getSpiContext().node(nodeId)) && - !recovery.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Node connection is idle, but there are unacknowledged messages, " + - "will wait: " + nodeId); + continue; + } + } - continue; - } + long idleTime = client.getIdleTime(); + if (idleTime >= idleConnTimeout) { + if (recovery != null && + recovery.nodeAlive(getSpiContext().node(nodeId)) && + !recovery.messagesFutures().isEmpty()) { if (log.isDebugEnabled()) - log.debug("Closing idle node connection: " + nodeId); + log.debug("Node connection is idle, but there are unacknowledged messages, " + + "will wait: " + nodeId); - if (client.close() || client.closed()) - clients.remove(nodeId, client); + continue; } - } - Thread.sleep(idleConnTimeout); + if (log.isDebugEnabled()) + log.debug("Closing idle node connection: " + nodeId); + + if (client.close() || client.closed()) + clients.remove(nodeId, client); + } } } @@ -2264,212 +2220,39 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } } - } - - /** - * - */ - private class ClientFlushWorker extends IgniteSpiThread { - /** - * - */ - ClientFlushWorker() { - super(gridName, "nio-client-flusher", log); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"BusyWait"}) - @Override protected void body() throws InterruptedException { - while (!isInterrupted()) { - long connBufFlushFreq0 = connBufFlushFreq; - - for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) { - GridCommunicationClient client = entry.getValue(); - - if (client.reserve()) { - boolean err = true; - - try { - client.flushIfNeeded(connBufFlushFreq0); - - err = false; - } - catch (IOException e) { - if (getSpiContext().pingNode(entry.getKey())) - U.error(log, "Failed to flush client: " + client, e); - else { - if (log.isDebugEnabled()) - log.debug("Failed to flush client (node left): " + client); - - onException("Failed to flush client (node left): " + client, e); - } - } - finally { - if (err) - client.forceClose(); - else - client.release(); - } - } - } - - Thread.sleep(connBufFlushFreq0); - } - } - } - - /** - * Handles sockets timeouts. - */ - private class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() { - @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); - - long id1 = o1.id(); - long id2 = o2.id(); - - return time1 < time2 ? -1 : time1 > time2 ? 1 : - id1 < id2 ? -1 : id1 > id2 ? 1 : 0; - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-comm-sock-timeout-worker", log); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } /** - * @param timeoutObj Timeout object to remove. + * @param recoveryDesc Recovery descriptor. */ - public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); + private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { + ClusterNode node = recoveryDesc.node(); - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) { - HandshakeTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); + if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + return; - timeoutObj.onTimeout(); - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - HandshakeTimeoutObject first = timeoutObjs.firstx(); + try { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); + GridCommunicationClient client = reserveClient(node); - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } + client.release(); } - } - } - - /** - * - */ - private class RecoveryWorker extends IgniteSpiThread { - /** */ - private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); - - /** - * - */ - private RecoveryWorker() { - super(gridName, "tcp-comm-recovery-worker", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Recovery worker has been started."); - - while (!isInterrupted()) { - GridNioRecoveryDescriptor recoveryDesc = q.take(); - - assert recoveryDesc != null; - - ClusterNode node = recoveryDesc.node(); - - if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) - continue; - - try { + catch (IgniteCheckedException | IgniteException e) { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - - GridCommunicationClient client = reserveClient(node); + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - client.release(); + addReconnectRequest(recoveryDesc); } - catch (IgniteCheckedException e) { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - - addReconnectRequest(recoveryDesc); - } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); - } + else { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", + e); } } } @@ -2497,12 +2280,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private static class HandshakeTimeoutObject<T> { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - + private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject { /** */ - private final long id = idGen.incrementAndGet(); + private final IgniteUuid id = IgniteUuid.randomUuid(); /** */ private final T obj; @@ -2533,34 +2313,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return done.compareAndSet(false, true); } - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { + /** {@inheritDoc} */ + @Override public void onTimeout() { if (done.compareAndSet(false, true)) { // Close socket - timeout occurred. if (obj instanceof GridCommunicationClient) ((GridCommunicationClient)obj).forceClose(); else U.closeQuiet((AbstractInterruptibleChannel)obj); - - return true; } - - return false; } - /** - * @return End time. - */ - long endTime() { + /** {@inheritDoc} */ + @Override public long endTime() { return endTime; } - /** - * @return ID. - */ - long id() { + /** {@inheritDoc} */ + @Override public IgniteUuid id() { return id; }
