http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4e20425..7fbf253 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; @@ -387,6 +386,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { subjId, taskNameHash); + if (tx.system()) { + AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx); + + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer != null) + tx.topologyVersion(topVer); + } + return onCreated(sysCacheCtx, tx); } @@ -484,26 +491,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { }); for (IgniteInternalTx tx : txs()) { - // Must wait for all transactions, even for DHT local and DHT remote since preloading may acquire - // values pending to be overwritten by prepared transaction. - - if (tx.concurrency() == PESSIMISTIC) { - if (tx.topologyVersion().compareTo(AffinityTopologyVersion.ZERO) > 0 - && tx.topologyVersion().compareTo(topVer) < 0) - // For PESSIMISTIC mode we must wait for all uncompleted txs - // as we do not know in advance which keys will participate in tx. - res.add(tx.finishFuture()); - } - else if (tx.concurrency() == OPTIMISTIC) { - // For OPTIMISTIC mode we wait only for txs in PREPARING state that - // have keys for given partitions. - TransactionState state = tx.state(); - AffinityTopologyVersion txTopVer = tx.topologyVersion(); - - if ((state != ACTIVE && state != COMMITTED && state != ROLLED_BACK && state != UNKNOWN) - && txTopVer.compareTo(AffinityTopologyVersion.ZERO) > 0 && txTopVer.compareTo(topVer) < 0) - res.add(tx.finishFuture()); - } + if (needWaitTransaction(tx, topVer)) + res.add(tx.finishFuture()); } res.markInitialized(); @@ -512,6 +501,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param tx Transaction. + * @param topVer Exchange version. + * @return {@code True} if need wait transaction for exchange. + */ + public boolean needWaitTransaction(IgniteInternalTx tx, AffinityTopologyVersion topVer) { + AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot(); + + return txTopVer != null && txTopVer.compareTo(topVer) < 0; + } + + /** * Transaction start callback (has to do with when any operation was * performed on this transaction). * @@ -536,21 +536,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Reverse mapped version look up. - * - * @param dhtVer Dht version. - * @return Near version. - */ - @Nullable public GridCacheVersion nearVersion(GridCacheVersion dhtVer) { - IgniteInternalTx tx = idMap.get(dhtVer); - - if (tx != null) - return tx.nearXidVersion(); - - return null; - } - - /** * @param from Near version. * @return DHT version for a near version. */ @@ -1445,7 +1430,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { try { // Renew cache entry. - txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key())); + txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key(), tx.topologyVersion())); } catch (GridDhtInvalidPartitionException e) { assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" + @@ -1494,7 +1479,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { log.debug("Got removed entry in TM unlockMultiple(..) method (will retry): " + txEntry); // Renew cache entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion())); } } } @@ -1940,11 +1925,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @Override public int hashCode() { - int result = (int)(threadId ^ (threadId >>> 32)); + int res = (int)(threadId ^ (threadId >>> 32)); - result = 31 * result + cacheId; + res = 31 * res + cacheId; - return result; + return res; } } @@ -1979,32 +1964,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Atomic integer that compares only using references, not values. - */ - private static final class AtomicInt extends AtomicInteger { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param initVal Initial value. - */ - private AtomicInt(int initVal) { - super(initVal); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - // Reference only. - return obj == this; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return super.hashCode(); - } - } - - /** * Commit listener. Checks if commit succeeded and rollbacks if case of error. */ private class CommitListener implements CI1<IgniteInternalFuture<IgniteInternalTx>> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java index e6cba00..95aab74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java @@ -23,7 +23,6 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.nio.ByteBuffer; import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -72,10 +71,10 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, * @param dataCenterId Replication data center ID. */ public GridCacheVersion(int topVer, long globalTime, long order, int nodeOrder, int dataCenterId) { - assert topVer >= 0; - assert order >= 0; - assert nodeOrder >= 0; - assert dataCenterId < 32 && dataCenterId >= 0; + assert topVer >= 0 : topVer; + assert order >= 0 : order; + assert nodeOrder >= 0 : nodeOrder; + assert dataCenterId < 32 && dataCenterId >= 0 : dataCenterId; if (nodeOrder > NODE_ORDER_MASK) throw new IllegalArgumentException("Node order overflow: " + nodeOrder); @@ -350,6 +349,9 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>, /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridCacheVersion.class, this); + return "GridCacheVersion [topVer=" + topologyVersion() + + ", time=" + globalTime() + + ", order=" + order() + + ", nodeOrder=" + nodeOrder() + ']'; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java index 6642219..c89b941 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java @@ -158,4 +158,13 @@ public class GridCacheVersionEx extends GridCacheVersion { drVer.writeExternal(out); } + + /** {@inheritDoc} */ + @Override public String toString() { + return "GridCacheVersionEx [topVer=" + topologyVersion() + + ", time=" + globalTime() + + ", order=" + order() + + ", nodeOrder=" + nodeOrder() + + ", drVer=" + drVer + ']'; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 166c713..9be8b50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -119,7 +119,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @param ver Remote version. */ public void onReceived(UUID nodeId, long ver) { - if (ver > 0) + if (ver > 0) { while (true) { long order = this.order.get(); @@ -138,6 +138,25 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { break; } + } + } + + /** + * @param rcvOrder Received order. + */ + public void onExchange(long rcvOrder) { + long order; + + while (true) { + order = this.order.get(); + + if (rcvOrder > order) { + if (this.order.compareAndSet(order, rcvOrder)) + break; + } + else + break; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 99e0bb5..abafe85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -64,7 +64,6 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 0b02abd..10aa71e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -960,7 +960,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { IgniteOutClosureX<GridCacheQueueHeader> rmv = new IgniteOutClosureX<GridCacheQueueHeader>() { @Override public GridCacheQueueHeader applyx() throws IgniteCheckedException { - return (GridCacheQueueHeader)cctx.cache().getAndRemove(new GridCacheQueueHeaderKey(name)); + return (GridCacheQueueHeader)cctx.cache().withNoRetries().getAndRemove(new GridCacheQueueHeaderKey(name)); } }; @@ -1545,7 +1545,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { IgniteOutClosureX<GridCacheSetHeader> rmv = new IgniteOutClosureX<GridCacheSetHeader>() { @Override public GridCacheSetHeader applyx() throws IgniteCheckedException { - return (GridCacheSetHeader)cctx.cache().getAndRemove(new GridCacheSetHeaderKey(name)); + return (GridCacheSetHeader)cctx.cache().withNoRetries().getAndRemove(new GridCacheSetHeaderKey(name)); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java index d217390..fce8b1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java @@ -74,7 +74,7 @@ public class OdbcMessageParser { BinaryInputStream stream = new BinaryHeapInputStream(msg); - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null); + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true); byte cmd = reader.readByte(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java index d89176b..f6e0079 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java @@ -192,6 +192,7 @@ public class PlatformContextImpl implements PlatformContext { in, ctx.config().getClassLoader(), null, + true, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index e58b862..8aa69a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -44,6 +44,7 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridClosureCallMode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -51,9 +52,12 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -90,7 +94,6 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; -import static org.apache.ignite.events.EventType.EVTS_DISCOVERY; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -103,6 +106,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Time to wait before reassignment retries. */ private static final long RETRY_TIMEOUT = 1000; + /** */ + private static final int[] EVTS = { + EventType.EVT_NODE_JOINED, + EventType.EVT_NODE_LEFT, + EventType.EVT_NODE_FAILED, + DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT + }; + /** Local service instances. */ private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>(); @@ -168,7 +179,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache = ctx.cache().utilityCache(); if (!ctx.clientNode()) - ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); + ctx.event().addLocalEventListener(topLsnr, EVTS); try { if (ctx.deploy().enabled()) @@ -741,7 +752,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param topVer Topology version. * @throws IgniteCheckedException If failed. */ - private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException { + private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException { ServiceConfiguration cfg = dep.configuration(); Object nodeFilter = cfg.getNodeFilter(); @@ -755,7 +766,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { Object affKey = cfg.getAffinityKey(); while (true) { - GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer); + GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion()); Collection<ClusterNode> nodes; @@ -785,7 +796,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { Map<UUID, Integer> cnts = new HashMap<>(); if (affKey != null) { - ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer)); + ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer); if (n != null) { int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt; @@ -1128,7 +1139,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { svcName.set(dep.configuration().getName()); // Ignore other utility cache events. - long topVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null); @@ -1188,23 +1199,23 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param dep Service deployment. * @param topVer Topology version. */ - private void onDeployment(final GridServiceDeployment dep, final long topVer) { + private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { // Retry forever. try { - long newTopVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); // If topology version changed, reassignment will happen from topology event. - if (newTopVer == topVer) + if (newTopVer.equals(topVer)) reassign(dep, topVer); } catch (IgniteCheckedException e) { if (!(e instanceof ClusterTopologyCheckedException)) log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); - long newTopVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - if (newTopVer != topVer) { - assert newTopVer > topVer; + if (!newTopVer.equals(topVer)) { + assert newTopVer.compareTo(topVer) > 0; // Reassignment will happen from topology event. return; @@ -1245,16 +1256,28 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ private class TopologyListener implements GridLocalEventListener { /** {@inheritDoc} */ - @Override public void onEvent(final Event evt) { + @Override public void onEvent(Event evt) { if (!busyLock.enterBusy()) return; try { + final AffinityTopologyVersion topVer; + + if (evt instanceof DiscoveryCustomEvent) { + DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage(); + + topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion(); + + if (msg instanceof CacheAffinityChangeMessage) { + if (!((CacheAffinityChangeMessage)msg).exchangeNeeded()) + return; + } + } + else + topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + depExe.submit(new BusyRunnable() { @Override public void run0() { - AffinityTopologyVersion topVer = - new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion()); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); if (oldest != null && oldest.isLocal()) { @@ -1281,7 +1304,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity(). affinityReadyFuture(topVer).get(); - reassign(dep, topVer.topologyVersion()); + reassign(dep, topVer); } catch (IgniteCheckedException ex) { if (!(e instanceof ClusterTopologyCheckedException)) @@ -1298,7 +1321,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } if (!retries.isEmpty()) - onReassignmentFailed(topVer.topologyVersion(), retries); + onReassignmentFailed(topVer, retries); } // Clean up zombie assignments. @@ -1335,13 +1358,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param topVer Topology version. * @param retries Retries. */ - private void onReassignmentFailed(final long topVer, final Collection<GridServiceDeployment> retries) { + private void onReassignmentFailed(final AffinityTopologyVersion topVer, + final Collection<GridServiceDeployment> retries) { if (!busyLock.enterBusy()) return; try { // If topology changed again, let next event handle it. - if (ctx.discovery().topologyVersion() != topVer) + if (ctx.discovery().topologyVersionEx().equals(topVer)) return; for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index b8de1d3..ad6f0e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9489,4 +9489,20 @@ public abstract class IgniteUtils { public static boolean isToStringMethod(Method mtd) { return toStringMtd.equals(mtd); } + + /** + * @param threadId Thread ID. + * @return Thread name if found. + */ + public static String threadName(long threadId) { + Thread[] threads = new Thread[Thread.activeCount()]; + + int cnt = Thread.enumerate(threads); + + for (int i = 0; i < cnt; i++) + if (threads[i].getId() == threadId) + return threads[i].getName(); + + return "<failed to find active thread " + threadId + '>'; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index c6a6a44..ea7a202 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -213,9 +213,8 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements lsnr = lsnr0; else if (lsnr instanceof ArrayListener) ((ArrayListener)lsnr).add(lsnr0); - else { + else lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0); - } return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 6b7710e..88e34e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; @@ -137,6 +138,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID; @@ -3190,6 +3192,10 @@ class ServerImpl extends TcpDiscoveryImpl { Boolean rmtMarshUseDfltSuid = node.attribute(ATTR_MARSHALLER_USE_DFLT_SUID); boolean rmtMarshUseDfltSuidBool = rmtMarshUseDfltSuid == null ? true : rmtMarshUseDfltSuid; + Boolean locLateAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); + // Can be null only in tests. + boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false; + if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) { String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + " property value differs from remote node's value " + @@ -3200,34 +3206,17 @@ class ServerImpl extends TcpDiscoveryImpl { ", rmtNodeAddrs=" + U.addressesAsString(node) + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; - LT.warn(log, null, errMsg); - - // Always output in debug. - if (log.isDebugEnabled()) - log.debug(errMsg); - - try { - String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + - " property value differs from remote node's value " + - "(to make sure all nodes in topology have identical marshaller settings, " + - "configure system property explicitly) " + - "[locMarshUseDfltSuid=" + rmtMarshUseDfltSuid + - ", rmtMarshUseDfltSuid=" + locMarshUseDfltSuid + - ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + - ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + - ", rmtNodeId=" + locNode.id() + ']'; - - trySendMessageDirectly(node, - new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg)); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send marshaller check failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']'); + String sndMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + + " property value differs from remote node's value " + + "(to make sure all nodes in topology have identical marshaller settings, " + + "configure system property explicitly) " + + "[locMarshUseDfltSuid=" + rmtMarshUseDfltSuid + + ", rmtMarshUseDfltSuid=" + locMarshUseDfltSuid + + ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + + ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + + ", rmtNodeId=" + locNode.id() + ']'; - onException("Failed to send marshaller check failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']', e); - } + nodeCheckError(node, errMsg, sndMsg); // Ignore join request. return; @@ -3249,31 +3238,48 @@ class ServerImpl extends TcpDiscoveryImpl { ", rmtNodeAddrs=" + U.addressesAsString(node) + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; - LT.warn(log, null, errMsg); + String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " + + "the same property on remote node (make sure all nodes in topology have the same value " + + "of \"compactFooter\" property) [locMarshallerCompactFooter=" + rmtMarshCompactFooterBool + + ", rmtMarshallerCompactFooter=" + locMarshCompactFooterBool + + ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + + ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + + ", rmtNodeId=" + locNode.id() + ']'; - // Always output in debug. - if (log.isDebugEnabled()) - log.debug(errMsg); + nodeCheckError(node, errMsg, sndMsg); - try { - String sndMsg = "Local node's binary marshaller \"compactFooter\" property differs from " + - "the same property on remote node (make sure all nodes in topology have the same value " + - "of \"compactFooter\" property) [locMarshallerCompactFooter=" + rmtMarshCompactFooterBool + - ", rmtMarshallerCompactFooter=" + locMarshCompactFooterBool + - ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + - ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + - ", rmtNodeId=" + locNode.id() + ']'; + // Ignore join request. + return; + } - trySendMessageDirectly(node, new TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg)); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send marshaller check failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']'); + boolean rmtLateAssignBool; - onException("Failed to send marshaller check failed message to node " + - "[node=" + node + ", err=" + e.getMessage() + ']', e); - } + if (node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0) { + Boolean rmtLateAssign = node.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); + // Can be null only in tests. + rmtLateAssignBool = rmtLateAssign != null ? rmtLateAssign : false; + } + else + rmtLateAssignBool = false; + + if (locLateAssignBool != rmtLateAssignBool) { + String errMsg = "Local node's cache affinity assignment mode differs from " + + "the same property on remote node (make sure all nodes in topology have the same " + + "cache affinity assignment mode) [locLateAssign=" + locLateAssignBool + + ", rmtLateAssign=" + rmtLateAssignBool + + ", locNodeAddrs=" + U.addressesAsString(locNode) + + ", rmtNodeAddrs=" + U.addressesAsString(node) + + ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']'; + + String sndMsg = "Local node's cache affinity assignment mode differs from " + + "the same property on remote node (make sure all nodes in topology have the same " + + "cache affinity assignment mode) [locLateAssign=" + rmtLateAssignBool + + ", rmtLateAssign=" + locLateAssign + + ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() + + ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() + + ", rmtNodeId=" + locNode.id() + ']'; + + nodeCheckError(node, errMsg, sndMsg); // Ignore join request. return; @@ -3300,6 +3306,31 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param node Joining node. + * @param errMsg Message to log. + * @param sndMsg Message to send. + */ + private void nodeCheckError(TcpDiscoveryNode node, String errMsg, String sndMsg) { + LT.warn(log, null, errMsg); + + // Always output in debug. + if (log.isDebugEnabled()) + log.debug(errMsg); + + try { + trySendMessageDirectly(node, new TcpDiscoveryCheckFailedMessage(locNode.id(), sndMsg)); + } + catch (IgniteSpiException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send marshaller check failed message to node " + + "[node=" + node + ", err=" + e.getMessage() + ']'); + + onException("Failed to send marshaller check failed message to node " + + "[node=" + node + ", err=" + e.getMessage() + ']', e); + } + } + + /** * Tries to send a message to all node's available addresses. * * @param node Node to send message to. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 277055a..df152f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1701,6 +1701,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * @param joiningNodeID Joining node ID. * @param nodeId Remote node ID for which data is provided. * @param data Collection of marshalled discovery data objects from different components. + * @param clsLdr Class loader. */ protected void onExchange(UUID joiningNodeID, UUID nodeId, http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java index 616fd43..3b4f033 100644 --- a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java @@ -89,7 +89,7 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest { this.backups = backups; this.funcType = funcType; - startGrids(nodesCnt); + startGridsMultiThreaded(nodesCnt, true); try { IgniteCache<Object, Object> cache = jcache(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java index 24704ed..47b01f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java @@ -101,7 +101,9 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - startGrids(NODE_CNT); + startGridsMultiThreaded(NODE_CNT - 1); + + startGrid(NODE_CNT - 1); // Start client after servers. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java index 7420a0d..f910dee 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java @@ -191,6 +191,8 @@ public class FairAffinityFunctionNodesSelfTest extends GridCommonAbstractTest { started.remove(idx); } + awaitPartitionMapExchange(); + topVer++; info("Grid 0: " + grid(0).localNode().id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java index 9e438e9..38685f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java @@ -51,7 +51,6 @@ public class GridAffinitySelfTest extends GridCommonAbstractTest { disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); disco.setIpFinder(IP_FINDER); - disco.setForceServerMode(true); cfg.setDiscoverySpi(disco); @@ -73,7 +72,9 @@ public class GridAffinitySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - startGridsMultiThreaded(1, 2); + startGrid(2); + + startGrid(1); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java index 3b33b83..f804cb3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java @@ -116,11 +116,13 @@ public class GridTaskFailoverAffinityRunTest extends GridCommonAbstractTest { final AtomicInteger gridIdx = new AtomicInteger(1); + final long stopTime = System.currentTimeMillis() + 60_000; + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { int grid = gridIdx.getAndIncrement(); - while (!stop.get()) { + while (!stop.get() && System.currentTimeMillis() < stopTime) { stopGrid(grid); startGrid(grid); @@ -131,8 +133,6 @@ public class GridTaskFailoverAffinityRunTest extends GridCommonAbstractTest { }, 2, "restart-thread"); try { - long stopTime = System.currentTimeMillis() + 60_000; - while (System.currentTimeMillis() < stopTime) { Collection<IgniteFuture<?>> futs = new ArrayList<>(1000); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 0c005e9..6869d1c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -84,7 +84,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra cfg.setDiscoverySpi(disco); - BlockTpcCommunicationSpi commSpi = new BlockTpcCommunicationSpi(); + BlockTcpCommunicationSpi commSpi = new BlockTcpCommunicationSpi(); commSpi.setSharedMemoryPort(-1); @@ -143,8 +143,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra * @param ignite Node. * @return Communication SPI. */ - protected BlockTpcCommunicationSpi commSpi(Ignite ignite) { - return ((BlockTpcCommunicationSpi)ignite.configuration().getCommunicationSpi()); + protected BlockTcpCommunicationSpi commSpi(Ignite ignite) { + return ((BlockTcpCommunicationSpi)ignite.configuration().getCommunicationSpi()); } /** {@inheritDoc} */ @@ -403,7 +403,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra /** * */ - protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi { + protected static class BlockTcpCommunicationSpi extends TcpCommunicationSpi { /** */ volatile Class msgCls; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 13cac81..4653ce9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -192,7 +192,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqInProg", 0, true); @@ -360,7 +360,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(srvAtomicRef.compareAndSet("2st value", "3st value")); assertEquals("3st value", srvAtomicRef.get()); - BlockTpcCommunicationSpi servCommSpi = commSpi(srv); + BlockTcpCommunicationSpi servCommSpi = commSpi(srv); servCommSpi.blockMessage(GridNearLockResponse.class); @@ -520,7 +520,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertEquals(2, srvAtomicStamped.value()); assertEquals(2, srvAtomicStamped.stamp()); - BlockTpcCommunicationSpi servCommSpi = commSpi(srv); + BlockTcpCommunicationSpi servCommSpi = commSpi(srv); servCommSpi.blockMessage(GridNearLockResponse.class); @@ -648,7 +648,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongInProggress", 0, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index 100e8de..8ee669c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -312,7 +312,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertFalse(srvSet.add("1")); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); if (colCfg.getAtomicityMode() == ATOMIC) commSpi.blockMessage(GridNearAtomicUpdateResponse.class); @@ -454,7 +454,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(srvQueue.contains("1")); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); if (colCfg.getAtomicityMode() == ATOMIC) commSpi.blockMessage(GridNearAtomicUpdateResponse.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java index a9d4b7d..cce0c7e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java @@ -56,7 +56,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr for (int i = 0; i < 100; i++) cache.put(i, i); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); commSpi.blockMessage(GridJobExecuteResponse.class); @@ -105,7 +105,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); commSpi.blockMessage(GridJobExecuteResponse.class); @@ -154,7 +154,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); commSpi.blockMessage(GridJobExecuteResponse.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java index 6b15b22..0409122 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** @@ -64,6 +65,7 @@ public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectFail ccfg2.setName(TX_CACHE); ccfg2.setBackups(1); ccfg2.setAtomicityMode(TRANSACTIONAL); + ccfg2.setWriteSynchronizationMode(FULL_SYNC); cfg.setCacheConfiguration(ccfg1, ccfg2); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java index 4db523d..3e961e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java @@ -134,7 +134,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst Ignite srv = clientRouter(client); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); commSpi.blockMessage(GridNearTxPrepareResponse.class); @@ -187,7 +187,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst assertNotNull(srvc); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); commSpi.blockMessage(GridJobExecuteResponse.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java index cc492e9..1e87751 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java @@ -141,7 +141,7 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst final IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME); - BlockTpcCommunicationSpi commSpi = commSpi(srv); + BlockTcpCommunicationSpi commSpi = commSpi(srv); commSpi.blockMessage(DataStreamerResponse.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 8a602ad..307a470 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; @@ -82,6 +83,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } if (block) { + ignite.log().info("Block message [node=" + node.id() + + ", msg=" + ioMsg.message() + ']'); + blockedMsgs.add(new T2<>(node, ioMsg)); return; @@ -147,9 +151,19 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { public void stopBlock() { synchronized (this) { blockCls.clear(); + blockP = null; + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + try { + ignite.log().info("Send blocked message [node=" + msg.get1().id() + + ", msg=" + msg.get2().message() + ']'); - for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) - super.sendMessage(msg.get1(), msg.get2()); + super.sendMessage(msg.get1(), msg.get2()); + } + catch (Throwable e) { + U.error(ignite.log(), "Failed to send blocked message: " + msg, e); + } + } blockedMsgs.clear(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java index 8530fbb..a1762cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -50,10 +50,10 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { private static final String CACHE_NAME = "myCache"; /** */ - private static final int MAX_FAILOVER_ATTEMPTS = 105; + private static final int MAX_FAILOVER_ATTEMPTS = 500; /** */ - private static final int SERVERS_COUNT = 4; + private static final int SRVS = 4; /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -79,7 +79,7 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(ccfg); - if (gridName.equals(getTestGridName(SERVERS_COUNT))) { + if (gridName.equals(getTestGridName(SRVS))) { cfg.setClientMode(true); spi.setForceServerMode(true); @@ -97,7 +97,7 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testAffinityCallRestartNode() throws Exception { - startGridsMultiThreaded(SERVERS_COUNT); + startGridsMultiThreaded(SRVS); final int ITERS = 5; @@ -106,7 +106,7 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { Integer key = primaryKey(grid(0).cache(CACHE_NAME)); - long topVer = grid(0).cluster().topologyVersion(); + AffinityTopologyVersion topVer = grid(0).context().discovery().topologyVersionEx(); IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -116,10 +116,10 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { return null; } - }); + }, "stop-thread"); while (!fut.isDone()) - grid(1).compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, topVer, topVer + 1)); + grid(1).compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, topVer)); fut.get(); @@ -136,17 +136,17 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { public void testAffinityCallNoServerNode() throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-1741"); - startGridsMultiThreaded(SERVERS_COUNT + 1); + startGridsMultiThreaded(SRVS + 1); final Integer key = 1; - final Ignite client = grid(SERVERS_COUNT); + final Ignite client = grid(SRVS); assertTrue(client.configuration().isClientMode()); final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - for (int i = 0; i < SERVERS_COUNT; ++i) + for (int i = 0; i < SRVS; ++i) stopGrid(i, false); return null; @@ -155,7 +155,7 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { try { while (!fut.isDone()) - client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key)); + client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null)); } catch (ClusterTopologyException ignore) { log.info("Expected error: " + ignore); @@ -177,36 +177,31 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { private Ignite ignite; /** */ - private long[] topVers; + private AffinityTopologyVersion topVer; /** * @param key Key. - * @param topVers Topology versions to check. + * @param topVer Topology version. */ - public CheckCallable(Object key, long... topVers) { + public CheckCallable(Object key, AffinityTopologyVersion topVer) { this.key = key; - this.topVers = topVers; + this.topVer = topVer; } /** {@inheritDoc} */ @Override public Object call() throws IgniteCheckedException { - if (topVers.length > 0) { - boolean pass = false; - + if (topVer != null) { GridCacheAffinityManager aff = ((IgniteKernal)ignite).context().cache().internalCache(CACHE_NAME).context().affinity(); ClusterNode loc = ignite.cluster().localNode(); - for (long topVer : topVers) { - if (loc.equals(aff.primary(key, new AffinityTopologyVersion(topVer, 0)))) { - pass = true; + if (loc.equals(aff.primary(key, topVer))) + return true; - break; - } - } + AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer.topologyVersion() + 1, 0); - assertTrue(pass); + assertEquals(loc, aff.primary(key, topVer0)); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNamesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNamesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNamesSelfTest.java index 6f65b16..f6fc77c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNamesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNamesSelfTest.java @@ -28,6 +28,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; * Test that validates {@link Ignite#cacheNames()} implementation. */ public class CacheNamesSelfTest extends GridCommonAbstractTest { + /** */ + private boolean client; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -43,7 +46,10 @@ public class CacheNamesSelfTest extends GridCommonAbstractTest { CacheConfiguration cacheCfg3 = new CacheConfiguration(); cacheCfg3.setCacheMode(CacheMode.LOCAL); - cfg.setCacheConfiguration(cacheCfg1, cacheCfg2, cacheCfg3); + if (client) + cfg.setClientMode(true); + else + cfg.setCacheConfiguration(cacheCfg1, cacheCfg2, cacheCfg3); return cfg; } @@ -61,6 +67,14 @@ public class CacheNamesSelfTest extends GridCommonAbstractTest { for (String name : names) assertTrue(name == null || name.equals("replicated") || name.equals("partitioned")); + + client = true; + + Ignite client = startGrid(2); + + names = client.cacheNames(); + + assertEquals(3, names.size()); } finally { stopAllGrids(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearUpdateTopologyChangeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearUpdateTopologyChangeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearUpdateTopologyChangeAbstractTest.java index ce1c0f7..e2ad79f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearUpdateTopologyChangeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearUpdateTopologyChangeAbstractTest.java @@ -54,6 +54,8 @@ public abstract class CacheNearUpdateTopologyChangeAbstractTest extends IgniteCa * @throws Exception If failed. */ public void testNearUpdateTopologyChange() throws Exception { + awaitPartitionMapExchange(); + final Affinity<Integer> aff = grid(0).affinity(null); final Integer key = 9; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java index b60ada7..0e821bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java @@ -108,6 +108,8 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest { startGrids(2); + awaitPartitionMapExchange(); + Ignite ignite = grid(1); cache = ignite.cache(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java index 6572d31..cc013bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -411,7 +412,14 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr cmp.set(barrier); - barrier.await(60_000, TimeUnit.MILLISECONDS); + try { + barrier.await(60_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + U.dumpThreads(log); + + fail("Failed to check cache content: " + e); + } log.info("Cache content check done."); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java index c3c2d47..14dc60d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java @@ -92,6 +92,8 @@ public class GridCacheDeploymentSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); + cfg.setLateAffinityAssignment(false); + return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java index 197a62a..ad37e17 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryVersionSelfTest.java @@ -37,7 +37,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager.TOP_VER_BASE_TIME; /** - * Tests that entry version is + * */ public class GridCacheEntryVersionSelfTest extends GridCommonAbstractTest { /** IP finder. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManagerDeserializationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManagerDeserializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManagerDeserializationTest.java index d12f56a..4a069a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManagerDeserializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManagerDeserializationTest.java @@ -17,6 +17,11 @@ package org.apache.ignite.internal.processors.cache; +import java.io.Serializable; +import java.util.Map; +import javax.cache.Cache; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; @@ -40,13 +45,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jsr166.ConcurrentHashMap8; -import javax.cache.Cache; -import javax.cache.integration.CacheLoaderException; -import javax.cache.integration.CacheWriterException; -import java.io.Serializable; -import java.util.Map; - +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * Checks whether storing to local store doesn't cause binary objects unmarshalling, @@ -66,19 +68,25 @@ public class GridCacheStoreManagerDeserializationTest extends GridCommonAbstract /** Test cache name. */ protected static final String CACHE_NAME = "cache_name"; - /** Cache mode. */ + /** + * @return Cache mode. + */ protected CacheMode cacheMode() { - return CacheMode.PARTITIONED; + return PARTITIONED; } - /** Cache write order mode. */ - protected CacheAtomicWriteOrderMode cacheAtomicWriteOrderMode() { - return CacheAtomicWriteOrderMode.PRIMARY; + /** + * @return Cache write order mode. + */ + private CacheAtomicWriteOrderMode cacheAtomicWriteOrderMode() { + return PRIMARY; } - /** Cache synchronization mode. */ + /** + * @return Cache synchronization mode. + */ private CacheWriteSynchronizationMode cacheWriteSynchronizationMode() { - return CacheWriteSynchronizationMode.FULL_SYNC; + return FULL_SYNC; } /** {@inheritDoc} */ @@ -142,7 +150,7 @@ public class GridCacheStoreManagerDeserializationTest extends GridCommonAbstract /** * Check whether test objects are stored correctly via stream API. * - * @throws Exception + * @throws Exception If failed. */ public void testStream() throws Exception { final Ignite grid = startGrid(); @@ -167,7 +175,7 @@ public class GridCacheStoreManagerDeserializationTest extends GridCommonAbstract * {@link org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry#clearInternal( * GridCacheVersion, boolean, GridCacheObsoleteEntryExtras)} * - * @throws Exception + * @throws Exception If failed. */ public void testPartitionMove() throws Exception { final Ignite grid = startGrid("binaryGrid1"); @@ -205,7 +213,7 @@ public class GridCacheStoreManagerDeserializationTest extends GridCommonAbstract /** * Check whether binary objects are stored without unmarshalling via stream API. * - * @throws Exception + * @throws Exception If failed. */ public void testBinaryStream() throws Exception { final Ignite grid = startGrid("binaryGrid"); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java index 5e45ba8..512549d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTcpClientDiscoveryMultiThreadedTest.java @@ -145,7 +145,7 @@ public class GridCacheTcpClientDiscoveryMultiThreadedTest extends GridCacheAbstr assertEquals(i, (int) cache.get(i)); if (isNearCacheNode) - assertEquals(i, (int) cache.localPeek(i, CachePeekMode.ONHEAP)); + assertEquals((Integer)i, cache.localPeek(i, CachePeekMode.ONHEAP)); } stopGrid(clientIdx); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java new file mode 100644 index 0000000..3e80525 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class GridCacheVersionTopologyChangeTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testVersionIncreaseAtomic() throws Exception { + checkVersionIncrease(cacheConfigurations(ATOMIC)); + } + + /** + * @throws Exception If failed. + */ + public void testVersionIncreaseTx() throws Exception { + checkVersionIncrease(cacheConfigurations(TRANSACTIONAL)); + } + + /** + * @param ccfgs Cache configurations. + * @throws Exception If failed. + */ + private void checkVersionIncrease(List<CacheConfiguration<Object, Object>> ccfgs) throws Exception { + try { + assert ccfgs.size() > 0; + + Ignite ignite = startGrid(0); + + List<IgniteCache<Object, Object>> caches = new ArrayList<>(); + List<Set<Integer>> cachesKeys = new ArrayList<>(); + + for (CacheConfiguration<Object, Object> ccfg : ccfgs) { + IgniteCache<Object, Object> cache = ignite.createCache(ccfg); + + caches.add(cache); + + Affinity<Object> aff = ignite.affinity(ccfg.getName()); + + int parts = aff.partitions(); + + assert parts > 0 : parts; + + Set<Integer> keys = new HashSet<>(); + + for (int p = 0; p < parts; p++) { + for (int k = 0; k < 100_000; k++) { + if (aff.partition(k) == p) { + assertTrue(keys.add(k)); + + break; + } + } + } + + assertEquals(parts, keys.size()); + + cachesKeys.add(keys); + } + + List<Map<Integer, Comparable>> cachesVers = new ArrayList<>(); + + for (int i = 0; i < caches.size(); i++) { + IgniteCache<Object, Object> cache = caches.get(i); + + Map<Integer, Comparable> vers = new HashMap<>(); + + for (Integer k : cachesKeys.get(i)) { + cache.put(k, k); + + vers.put(k, cache.getEntry(k).version()); + } + + cachesVers.add(vers); + } + + for (int i = 0; i < caches.size(); i++) { + for (int k = 0; k < 10; k++) + checkVersionIncrease(caches.get(i), cachesVers.get(i)); + } + + int nodeIdx = 1; + + for (int n = 0; n < 10; n++) { + startGrid(nodeIdx++); + + for (int i = 0; i < caches.size(); i++) + checkVersionIncrease(caches.get(i), cachesVers.get(i)); + + awaitPartitionMapExchange(); + + for (int i = 0; i < caches.size(); i++) + checkVersionIncrease(caches.get(i), cachesVers.get(i)); + } + + for (int n = 1; n < nodeIdx; n++) { + log.info("Stop node: " + n); + + stopGrid(n); + + for (int i = 0; i < caches.size(); i++) + checkVersionIncrease(caches.get(i), cachesVers.get(i)); + + awaitPartitionMapExchange(); + + for (int i = 0; i < caches.size(); i++) + checkVersionIncrease(caches.get(i), cachesVers.get(i)); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param cache Cache. + * @param vers Current versions. + */ + @SuppressWarnings("unchecked") + private void checkVersionIncrease(IgniteCache<Object, Object> cache, Map<Integer, Comparable> vers) { + for (Integer k : vers.keySet()) { + cache.put(k, k); + + Comparable curVer = vers.get(k); + + CacheEntry entry = cache.getEntry(k); + + if (entry != null) { + Comparable newVer = entry.version(); + + assertTrue(newVer.compareTo(curVer) > 0); + + vers.put(k, newVer); + } + else { + CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class); + + assertEquals(0, ccfg.getBackups()); + } + } + } + + /** + * @param atomicityMode Cache atomicity mode. + * @return Cache configurations. + */ + private List<CacheConfiguration<Object, Object>> cacheConfigurations(CacheAtomicityMode atomicityMode) { + List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration("c1", atomicityMode, new RendezvousAffinityFunction(), 0)); + ccfgs.add(cacheConfiguration("c2", atomicityMode, new RendezvousAffinityFunction(), 1)); + ccfgs.add(cacheConfiguration("c3", atomicityMode, new RendezvousAffinityFunction(false, 10), 0)); + + ccfgs.add(cacheConfiguration("c4", atomicityMode, new FairAffinityFunction(), 0)); + ccfgs.add(cacheConfiguration("c5", atomicityMode, new FairAffinityFunction(), 1)); + ccfgs.add(cacheConfiguration("c6", atomicityMode, new FairAffinityFunction(false, 10), 0)); + + return ccfgs; + } + + /** + * @param name Cache name. + * @param atomicityMode Cache atomicity mode. + * @param aff Affinity. + * @param backups Backups number. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(String name, + CacheAtomicityMode atomicityMode, + AffinityFunction aff, + int backups) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setBackups(backups); + ccfg.setName(name); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setAffinity(aff); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } +}
