http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index a4ff04b..90d6242 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -22,14 +22,19 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.UUID; + import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; @@ -48,6 +53,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private Map<Integer, GridDhtPartitionFullMap> parts; /** */ + @GridDirectMap(keyType = Integer.class, valueType = Integer.class) + private Map<Integer, Integer> dupPartsData; + + /** */ private byte[] partsBytes; /** Partitions update counters. */ @@ -61,6 +70,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Topology version. */ private AffinityTopologyVersion topVer; + /** */ + @GridDirectTransient + private transient boolean compress; + /** * Required by {@link Externalizable}. */ @@ -84,6 +97,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** + * @param compress {@code True} if it is possible to use compression for message. + */ + public void compress(boolean compress) { + this.compress = compress; + } + + /** * @return Local partitions. */ public Map<Integer, GridDhtPartitionFullMap> partitions() { @@ -92,14 +112,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param cacheId Cache ID. + * @return {@code True} if message contains full map for given cache. + */ + public boolean containsCache(int cacheId) { + return parts != null && parts.containsKey(cacheId); + } + + /** + * @param cacheId Cache ID. * @param fullMap Full partitions map. + * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { + public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); - if (!parts.containsKey(cacheId)) + if (!parts.containsKey(cacheId)) { parts.put(cacheId, fullMap); + + if (dupDataCache != null) { + assert compress; + assert parts.containsKey(dupDataCache); + + if (dupPartsData == null) + dupPartsData = new HashMap<>(); + + dupPartsData.put(cacheId, dupDataCache); + } + } } /** @@ -132,11 +172,38 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (parts != null && partsBytes == null) - partsBytes = U.marshal(ctx, parts); + boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null); + + if (marshal) { + byte[] partsBytes0 = null; + byte[] partCntrsBytes0 = null; + + if (parts != null && partsBytes == null) + partsBytes0 = U.marshal(ctx, parts); - if (partCntrs != null && partCntrsBytes == null) - partCntrsBytes = U.marshal(ctx, partCntrs); + if (partCntrs != null && partCntrsBytes == null) + partCntrsBytes0 = U.marshal(ctx, partCntrs); + + if (compress) { + assert !compressed(); + + try { + byte[] partsBytesZip = U.zip(partsBytes0); + byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); + + partsBytes0 = partsBytesZip; + partCntrsBytes0 = partCntrsBytesZip; + + compressed(true); + } + catch (IgniteCheckedException e) { + U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); + } + } + + partsBytes = partsBytes0; + partCntrsBytes = partCntrsBytes0; + } } /** @@ -157,14 +224,49 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null && parts == null) - parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partsBytes != null && parts == null) { + if (compressed()) + parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (dupPartsData != null) { + assert parts != null; + + for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { + GridDhtPartitionFullMap map1 = parts.get(e.getKey()); + GridDhtPartitionFullMap map2 = parts.get(e.getValue()); + + assert map1 != null : e.getKey(); + assert map2 != null : e.getValue(); + assert map1.size() == map2.size(); + + for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) { + GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey()); + + assert partMap1 != null && partMap1.map().isEmpty() : partMap1; + assert !partMap1.hasMovingPartitions() : partMap1; + + GridDhtPartitionMap2 partMap2 = e0.getValue(); + + assert partMap2 != null; + + for (Map.Entry<Integer, GridDhtPartitionState> stateEntry : partMap2.entrySet()) + partMap1.put(stateEntry.getKey(), stateEntry.getValue()); + } + } + } + } if (parts == null) parts = new HashMap<>(); - if (partCntrsBytes != null && partCntrs == null) - partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partCntrsBytes != null && partCntrs == null) { + if (compressed()) + partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } if (partCntrs == null) partCntrs = new HashMap<>(); @@ -185,19 +287,25 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } switch (writer.state()) { - case 5: + case 6: + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); - case 6: + case 8: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; writer.incrementState(); - case 7: + case 9: if (!writer.writeMessage("topVer", topVer)) return false; @@ -219,7 +327,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return false; switch (reader.state()) { - case 5: + case 6: + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) @@ -227,7 +343,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 6: + case 8: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -235,7 +351,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 7: + case 9: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -255,7 +371,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 10; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index e4356b1..bf08f0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -23,12 +23,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectTransient private Map<Integer, GridDhtPartitionMap2> parts; + /** */ + @GridDirectMap(keyType = Integer.class, valueType = Integer.class) + private Map<Integer, Integer> dupPartsData; + /** Serialized partitions. */ private byte[] partsBytes; @@ -59,6 +67,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** */ private boolean client; + /** */ + @GridDirectTransient + private transient boolean compress; + /** * Required by {@link Externalizable}. */ @@ -70,13 +82,16 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param exchId Exchange ID. * @param client Client message flag. * @param lastVer Last version. + * @param compress {@code True} if it is possible to use compression for message. */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, - @Nullable GridCacheVersion lastVer) { + @Nullable GridCacheVersion lastVer, + boolean compress) { super(exchId, lastVer); this.client = client; + this.compress = compress; } /** @@ -87,16 +102,26 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * Adds partition map to this message. - * * @param cacheId Cache ID to add local partition for. * @param locMap Local partition map. + * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) { + public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); parts.put(cacheId, locMap); + + if (dupDataCache != null) { + assert compress; + assert F.isEmpty(locMap.map()); + assert parts.containsKey(dupDataCache); + + if (dupPartsData == null) + dupPartsData = new HashMap<>(); + + dupPartsData.put(cacheId, dupDataCache); + } } /** @@ -136,22 +161,77 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (partsBytes == null && parts != null) - partsBytes = U.marshal(ctx, parts); + boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null); + + if (marshal) { + byte[] partsBytes0 = null; + byte[] partCntrsBytes0 = null; + + if (parts != null && partsBytes == null) + partsBytes0 = U.marshal(ctx, parts); - if (partCntrsBytes == null && partCntrs != null) - partCntrsBytes = U.marshal(ctx, partCntrs); + if (partCntrs != null && partCntrsBytes == null) + partCntrsBytes0 = U.marshal(ctx, partCntrs); + + if (compress) { + assert !compressed(); + + try { + byte[] partsBytesZip = U.zip(partsBytes0); + byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); + + partsBytes0 = partsBytesZip; + partCntrsBytes0 = partCntrsBytesZip; + + compressed(true); + } + catch (IgniteCheckedException e) { + U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); + } + } + + partsBytes = partsBytes0; + partCntrsBytes = partCntrsBytes0; + } } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null && parts == null) - parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partsBytes != null && parts == null) { + if (compressed()) + parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + if (partCntrsBytes != null && partCntrs == null) { + if (compressed()) + partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + if (dupPartsData != null) { + assert parts != null; + + for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { + GridDhtPartitionMap2 map1 = parts.get(e.getKey()); - if (partCntrsBytes != null && partCntrs == null) - partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + assert map1 != null : e.getKey(); + assert F.isEmpty(map1.map()); + assert !map1.hasMovingPartitions(); + + GridDhtPartitionMap2 map2 = parts.get(e.getValue()); + + assert map2 != null : e.getValue(); + assert map2.map() != null; + + for (Map.Entry<Integer, GridDhtPartitionState> e0 : map2.map().entrySet()) + map1.put(e0.getKey(), e0.getValue()); + } + } } /** {@inheritDoc} */ @@ -169,19 +249,25 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } switch (writer.state()) { - case 5: + case 6: if (!writer.writeBoolean("client", client)) return false; writer.incrementState(); - case 6: + case 7: + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 8: if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); - case 7: + case 9: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -203,7 +289,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes return false; switch (reader.state()) { - case 5: + case 6: client = reader.readBoolean("client"); if (!reader.isLastRead()) @@ -211,7 +297,15 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 6: + case 7: + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) @@ -219,7 +313,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 7: + case 9: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -239,7 +333,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index a4106af..850b6d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@ -81,11 +81,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 6; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 09aec81..d6865c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -35,8 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -600,7 +600,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + ", node=" + node + ']'); - GridAffinityAssignment assignment = cctx.affinity().assignment(topVer); + AffinityAssignment assignment = cctx.affinity().assignment(topVer); boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 3a559e7..9fd9b6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheContext cctx = interCache != null ? interCache.context() : null; if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); + cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false)); routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } @@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); if (cache != null && !cache.isLocal() && cache.context().userCache()) - req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters()); + req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 7b011dd..e0f4a2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.util; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.Externalizable; @@ -128,6 +130,8 @@ import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; import javax.management.DynamicMBean; import javax.management.JMException; import javax.management.MBeanServer; @@ -9693,6 +9697,32 @@ public abstract class IgniteUtils { } /** + * @param marsh Marshaller. + * @param zipBytes Zip-compressed bytes. + * @param clsLdr Class loader to use. + * @return Unmarshalled object. + * @throws IgniteCheckedException + */ + public static <T> T unmarshalZip(Marshaller marsh, byte[] zipBytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { + assert marsh != null; + assert zipBytes != null; + + try { + ZipInputStream in = new ZipInputStream(new ByteArrayInputStream(zipBytes)); + + in.getNextEntry(); + + return marsh.unmarshal(in, clsLdr); + } + catch (IgniteCheckedException e) { + throw e; + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + + /** * Unmarshals object from the input stream using given class loader. * This method should not close given input stream. * <p/> @@ -9907,4 +9937,38 @@ public abstract class IgniteUtils { if (oldName != curName) LOC_IGNITE_NAME.set(oldName); } + + /** + * @param bytes Byte array to compress. + * @return Compressed bytes. + * @throws IgniteCheckedException If failed. + */ + public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException { + try { + if (bytes == null) + return null; + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + try (ZipOutputStream zos = new ZipOutputStream(bos)) { + ZipEntry entry = new ZipEntry(""); + + try { + entry.setSize(bytes.length); + + zos.putNextEntry(entry); + + zos.write(bytes); + } + finally { + zos.closeEntry(); + } + } + + return bos.toByteArray(); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index f929121..733d204 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - TcpDiscoveryAbstractMessage msg = null; + TcpDiscoveryAbstractMessage msg; while (!Thread.currentThread().isInterrupted()) { Socket sock; @@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl { continue; } - if (msg == null) - msg = queue.poll(); + msg = queue.poll(); if (msg == null) { mux.wait(); @@ -1121,19 +1120,13 @@ class ClientImpl extends TcpDiscoveryImpl { } } } - catch (IOException e) { + catch (InterruptedException e) { if (log.isDebugEnabled()) - U.error(log, "Failed to send node left message (will stop anyway) " + - "[sock=" + sock + ", msg=" + msg + ']', e); - - U.closeQuiet(sock); + log.debug("Client socket writer interrupted."); - synchronized (mux) { - if (sock == this.sock) - this.sock = null; // Connection has dead. - } + return; } - catch (IgniteCheckedException e) { + catch (Exception e) { if (spi.getSpiContext().isStopping()) { if (log.isDebugEnabled()) log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']'); @@ -1141,7 +1134,12 @@ class ClientImpl extends TcpDiscoveryImpl { else U.error(log, "Failed to send message: " + msg, e); - msg = null; + U.closeQuiet(sock); + + synchronized (mux) { + if (sock == this.sock) + this.sock = null; // Connection has dead. + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0de787d..8814745 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -133,7 +133,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -167,7 +167,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe */ class ServerImpl extends TcpDiscoveryImpl { /** */ - private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024); + private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE, 512); /** */ private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE = @@ -1479,7 +1479,7 @@ class ServerImpl extends TcpDiscoveryImpl { private void prepareNodeAddedMessage( TcpDiscoveryAbstractMessage msg, UUID destNodeId, - @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, + @Nullable Collection<PendingMessage> msgs, @Nullable IgniteUuid discardMsgId, @Nullable IgniteUuid discardCustomMsgId ) { @@ -1506,7 +1506,19 @@ class ServerImpl extends TcpDiscoveryImpl { } nodeAddedMsg.topology(topToSnd); - nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId); + + Collection<TcpDiscoveryAbstractMessage> msgs0 = null; + + if (msgs != null) { + msgs0 = new ArrayList<>(msgs.size()); + + for (PendingMessage pendingMsg : msgs) { + if (pendingMsg.msg != null) + msgs0.add(pendingMsg.msg); + } + } + + nodeAddedMsg.messages(msgs0, discardMsgId, discardCustomMsgId); Map<Long, Collection<ClusterNode>> hist; @@ -1892,7 +1904,10 @@ class ServerImpl extends TcpDiscoveryImpl { assert spi.ensured(msg) && msg.verified() : msg; if (msg instanceof TcpDiscoveryNodeAddedMessage) { - TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + TcpDiscoveryNodeAddedMessage addedMsg = + new TcpDiscoveryNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg); + + msg = addedMsg; TcpDiscoveryNode node = addedMsg.node(); @@ -1910,12 +1925,109 @@ class ServerImpl extends TcpDiscoveryImpl { addedMsg.clientTopology(top); } + + // Do not need this data for client reconnect. + addedMsg.oldNodesDiscoveryData(null); + } + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; + + if (addFinishMsg.clientDiscoData() != null) { + addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); + + msg = addFinishMsg; + + Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData(); + + Set<UUID> replaced = null; + + for (TcpDiscoveryAbstractMessage msg0 : msgs) { + if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) { + Map<UUID, Map<Integer, byte[]>> existingDiscoData = + ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData(); + + // Check if already stored message contains the same data to do not store copies multiple times. + if (existingDiscoData != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) { + UUID nodeId = e.getKey(); + + if (F.contains(replaced, nodeId)) + continue; + + Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey()); + + if (existingData != null && mapsEqual(e.getValue(), existingData)) { + e.setValue(existingData); + + if (replaced == null) + replaced = new HashSet<>(); + + boolean add = replaced.add(nodeId); + + assert add; + + if (replaced.size() == discoData.size()) + break; + } + } + + if (replaced != null && replaced.size() == discoData.size()) + break; + } + } + } + } } + else if (msg instanceof TcpDiscoveryNodeLeftMessage) + clearClientAddFinished(msg.creatorNodeId()); + else if (msg instanceof TcpDiscoveryNodeFailedMessage) + clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId()); msgs.add(msg); } /** + * @param clientId Client node ID. + */ + private void clearClientAddFinished(UUID clientId) { + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; + + if (addFinishMsg.clientDiscoData() != null && clientId.equals(addFinishMsg.nodeId())) { + addFinishMsg.clientDiscoData(null); + addFinishMsg.clientNodeAttributes(null); + + break; + } + } + } + } + + /** + * @param m1 Map 1. + * @param m2 Map 2. + * @return {@code True} if maps contain the same data. + */ + private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) { + if (m1 == m2) + return true; + + if (m1.size() == m2.size()) { + for (Map.Entry<Integer, byte[]> e : m1.entrySet()) { + byte[] data = m2.get(e.getKey()); + + if (!Arrays.equals(e.getValue(), data)) + return false; + } + + return true; + } + + return false; + } + + /** * Gets messages starting from provided ID (exclusive). If such * message is not found, {@code null} is returned (this indicates * a failure condition when it was already removed from queue). @@ -2009,6 +2121,37 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * + */ + private static class PendingMessage { + /** */ + TcpDiscoveryAbstractMessage msg; + + /** */ + final boolean customMsg; + + /** */ + final IgniteUuid id; + + /** + * @param msg Message. + */ + PendingMessage(TcpDiscoveryAbstractMessage msg) { + assert msg != null && msg.id() != null : msg; + + this.msg = msg; + + id = msg.id(); + customMsg = msg instanceof TcpDiscoveryCustomEventMessage; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PendingMessage.class, this); + } + } + + /** * Pending messages container. */ private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> { @@ -2016,7 +2159,7 @@ class ServerImpl extends TcpDiscoveryImpl { private static final int MAX = 1024; /** Pending messages. */ - private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + private final Queue<PendingMessage> msgs = new ArrayDeque<>(MAX * 2); /** Processed custom message IDs. */ private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2); @@ -2024,7 +2167,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** Discarded message ID. */ private IgniteUuid discardId; - /** Discarded message ID. */ + /** Discarded custom message ID. */ private IgniteUuid customDiscardId; /** @@ -2034,14 +2177,14 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to add. */ void add(TcpDiscoveryAbstractMessage msg) { - msgs.add(msg); + msgs.add(new PendingMessage(msg)); while (msgs.size() > MAX) { - TcpDiscoveryAbstractMessage polled = msgs.poll(); + PendingMessage polled = msgs.poll(); assert polled != null; - if (polled.id().equals(discardId)) + if (polled.id.equals(discardId)) break; } } @@ -2051,6 +2194,7 @@ class ServerImpl extends TcpDiscoveryImpl { * * @param msgs Message. * @param discardId Discarded message ID. + * @param customDiscardId Discarded custom event message ID. */ void reset( @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @@ -2059,8 +2203,10 @@ class ServerImpl extends TcpDiscoveryImpl { ) { this.msgs.clear(); - if (msgs != null) - this.msgs.addAll(msgs); + if (msgs != null) { + for (TcpDiscoveryAbstractMessage msg : msgs) + this.msgs.add(new PendingMessage(msg)); + } this.discardId = discardId; this.customDiscardId = customDiscardId; @@ -2070,12 +2216,52 @@ class ServerImpl extends TcpDiscoveryImpl { * Discards message with provided ID and all before it. * * @param id Discarded message ID. + * @param custom {@code True} if discard for {@link TcpDiscoveryCustomEventMessage}. */ void discard(IgniteUuid id, boolean custom) { if (custom) customDiscardId = id; else discardId = id; + + cleanup(); + } + + /** + * + */ + void cleanup() { + Iterator<PendingMessage> msgIt = msgs.iterator(); + + boolean skipMsg = discardId != null; + boolean skipCustomMsg = customDiscardId != null; + + while (msgIt.hasNext()) { + PendingMessage msg = msgIt.next(); + + if (msg.customMsg) { + if (skipCustomMsg) { + assert customDiscardId != null; + + if (F.eq(customDiscardId, msg.id)) { + msg.msg = null; + + return; + } + } + } + else { + if (skipMsg) { + assert discardId != null; + + if (F.eq(discardId, msg.id)) { + msg.msg = null; + + return; + } + } + } + } } /** @@ -2098,7 +2284,7 @@ class ServerImpl extends TcpDiscoveryImpl { private boolean skipCustomMsg = customDiscardId != null; /** Internal iterator. */ - private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator(); + private Iterator<PendingMessage> msgIt = msgs.iterator(); /** Next message. */ private TcpDiscoveryAbstractMessage next; @@ -2136,13 +2322,13 @@ class ServerImpl extends TcpDiscoveryImpl { next = null; while (msgIt.hasNext()) { - TcpDiscoveryAbstractMessage msg0 = msgIt.next(); + PendingMessage msg0 = msgIt.next(); - if (msg0 instanceof TcpDiscoveryCustomEventMessage) { + if (msg0.customMsg) { if (skipCustomMsg) { assert customDiscardId != null; - if (F.eq(customDiscardId, msg0.id())) + if (F.eq(customDiscardId, msg0.id)) skipCustomMsg = false; continue; @@ -2152,14 +2338,17 @@ class ServerImpl extends TcpDiscoveryImpl { if (skipMsg) { assert discardId != null; - if (F.eq(discardId, msg0.id())) + if (F.eq(discardId, msg0.id)) skipMsg = false; continue; } } - next = msg0; + if (msg0.msg == null) + continue; + + next = msg0.msg; break; } @@ -2985,9 +3174,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (pendingMsgs.msgs.isEmpty()) return false; - for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) { - if (pendingMsg instanceof TcpDiscoveryNodeAddedMessage) { - TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg; + for (PendingMessage pendingMsg : pendingMsgs.msgs) { + if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg; if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0) return true; @@ -3901,8 +4090,7 @@ class ServerImpl extends TcpDiscoveryImpl { Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); if (data != null) - spi.onExchange(node.id(), node.id(), data, - U.resolveClassLoader(spi.ignite().configuration())); + spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration())); msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id())); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 1b99a56..80f4565 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -59,6 +59,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess } /** + * @param msg Message. + */ + public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) { + super(msg); + + nodeId = msg.nodeId; + clientDiscoData = msg.clientDiscoData; + clientNodeAttrs = msg.clientNodeAttrs; + } + + /** * Gets ID of the node added. * * @return ID of the node added. http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 6f8e14e..bd52c04 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -17,7 +17,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; @@ -234,14 +236,41 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * @param oldNodesDiscoData Discovery data from old nodes. + */ + public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData) { + this.oldNodesDiscoData = oldNodesDiscoData; + } + + /** * @param nodeId Node ID. * @param discoData Discovery data to add. */ public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) { // Old nodes disco data may be null if message // makes more than 1 pass due to stopping of the nodes in topology. - if (oldNodesDiscoData != null) - oldNodesDiscoData.put(nodeId, discoData); + if (oldNodesDiscoData != null) { + for (Map.Entry<UUID, Map<Integer, byte[]>> existingDataEntry : oldNodesDiscoData.entrySet()) { + Map<Integer, byte[]> existingData = existingDataEntry.getValue(); + + Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<Integer, byte[]> discoDataEntry = it.next(); + + byte[] curData = existingData.get(discoDataEntry.getKey()); + + if (Arrays.equals(curData, discoDataEntry.getValue())) + it.remove(); + } + + if (discoData.isEmpty()) + break; + } + + if (!discoData.isEmpty()) + oldNodesDiscoData.put(nodeId, discoData); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java new file mode 100644 index 0000000..ed186ac --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String AFF1_CACHE1 = "a1c1"; + + /** */ + private static final String AFF1_CACHE2 = "a1c2"; + + /** */ + private static final String AFF2_CACHE1 = "a2c1"; + + /** */ + private static final String AFF2_CACHE2 = "a2c2"; + + /** */ + private static final String AFF3_CACHE1 = "a3c1"; + + /** */ + private static final String AFF4_FILTER_CACHE1 = "a4c1"; + + /** */ + private static final String AFF4_FILTER_CACHE2 = "a4c2"; + + /** */ + private static final String AFF5_FILTER_CACHE1 = "a5c1"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class); + + cfg.setCommunicationSpi(commSpi); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF1_CACHE1); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF1_CACHE2); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF2_CACHE1); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF2_CACHE2); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF3_CACHE1); + ccfg.setBackups(3); + + RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64); + ccfg.setAffinity(aff); + + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF4_FILTER_CACHE1); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF4_FILTER_CACHE2); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF5_FILTER_CACHE1); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testExchangeMessages() throws Exception { + ignite(0); + + startGrid(1); + + awaitPartitionMapExchange(); + + checkMessages(0, true); + + startGrid(2); + + awaitPartitionMapExchange(); + + checkMessages(0, true); + + client = true; + + startGrid(3); + + awaitPartitionMapExchange(); + + checkMessages(0, false); + + stopGrid(0); + + awaitPartitionMapExchange(); + + checkMessages(1, true); + } + + /** + * @param crdIdx Coordinator node index. + * @param checkSingle {@code True} if need check single messages. + */ + private void checkMessages(int crdIdx, boolean checkSingle) { + checkFullMessages(crdIdx); + + if (checkSingle) + checkSingleMessages(crdIdx); + } + + /** + * @param crdIdx Coordinator node index. + */ + private void checkFullMessages(int crdIdx) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi(); + + List<Object> msgs = commSpi0.recordedMessages(false); + + assertTrue(msgs.size() > 0); + + for (Object msg : msgs) { + assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage); + + checkFullMessage((GridDhtPartitionsFullMessage)msg); + } + } + + /** + * @param crdIdx Coordinator node index. + */ + private void checkSingleMessages(int crdIdx) { + int cnt = 0; + + for (Ignite ignite : Ignition.allGrids()) { + if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode()) + continue; + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + List<Object> msgs = commSpi0.recordedMessages(false); + + assertTrue(msgs.size() > 0); + + for (Object msg : msgs) { + assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage); + + checkSingleMessage((GridDhtPartitionsSingleMessage)msg); + } + + cnt++; + } + + assertTrue(cnt > 0); + } + + /** + * @param msg Message. + */ + private void checkFullMessage(GridDhtPartitionsFullMessage msg) { + Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData"); + + assertNotNull(dupPartsData); + + checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg); + checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg); + checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg); + + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1))); + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1))); + + Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs"); + + if (partCntrs != null) { + for (Map<Integer, Long> cntrs : partCntrs.values()) + assertTrue(cntrs.isEmpty()); + } + } + + /** + * @param msg Message. + */ + private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) { + Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData"); + + assertNotNull(dupPartsData); + + checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg); + checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg); + checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg); + + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1))); + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1))); + + Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs"); + + if (partCntrs != null) { + for (Map<Integer, Long> cntrs : partCntrs.values()) + assertTrue(cntrs.isEmpty()); + } + } + + /** + * @param cache1 Cache 1. + * @param cache2 Cache 2. + * @param dupPartsData Duplicated data map. + * @param msg Message. + */ + private void checkFullMessage(String cache1, + String cache2, + Map<Integer, Integer> dupPartsData, + GridDhtPartitionsFullMessage msg) + { + Integer cacheId; + Integer dupCacheId; + + if (dupPartsData.containsKey(CU.cacheId(cache1))) { + cacheId = CU.cacheId(cache1); + dupCacheId = CU.cacheId(cache2); + } + else { + cacheId = CU.cacheId(cache2); + dupCacheId = CU.cacheId(cache1); + } + + assertTrue(dupPartsData.containsKey(cacheId)); + assertEquals(dupCacheId, dupPartsData.get(cacheId)); + assertFalse(dupPartsData.containsKey(dupCacheId)); + + Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions(); + + GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId); + + for (GridDhtPartitionMap2 map : emptyFullMap.values()) + assertEquals(0, map.map().size()); + + GridDhtPartitionFullMap fullMap = parts.get(dupCacheId); + + for (GridDhtPartitionMap2 map : fullMap.values()) + assertFalse(map.map().isEmpty()); + } + + /** + * @param cache1 Cache 1. + * @param cache2 Cache 2. + * @param dupPartsData Duplicated data map. + * @param msg Message. + */ + private void checkSingleMessage(String cache1, + String cache2, + Map<Integer, Integer> dupPartsData, + GridDhtPartitionsSingleMessage msg) + { + Integer cacheId; + Integer dupCacheId; + + if (dupPartsData.containsKey(CU.cacheId(cache1))) { + cacheId = CU.cacheId(cache1); + dupCacheId = CU.cacheId(cache2); + } + else { + cacheId = CU.cacheId(cache2); + dupCacheId = CU.cacheId(cache1); + } + + assertTrue(dupPartsData.containsKey(cacheId)); + assertEquals(dupCacheId, dupPartsData.get(cacheId)); + assertFalse(dupPartsData.containsKey(dupCacheId)); + + Map<Integer, GridDhtPartitionMap2> parts = msg.partitions(); + + GridDhtPartitionMap2 emptyMap = parts.get(cacheId); + + assertEquals(0, emptyMap.map().size()); + + GridDhtPartitionMap2 map = parts.get(dupCacheId); + + assertFalse(map.map().isEmpty()); + } + + /** + * + */ + private static class TestNodeFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + // Do not start cache on coordinator. + return node.order() > 1; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java index 5dc059b..6c577c6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java @@ -630,7 +630,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra if (cacheMode() == LOCAL) return; - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java index 71d1182..3b0c2fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -72,6 +73,8 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + Boolean clientMode = client.get(); if (clientMode != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java index 2c47a1c..7b57d5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -115,21 +115,21 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri startGrid(2); startGrid(3); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); for (int i = 0; i < 2; i++) { stopGrid(3); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); startGrid(3); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); } startGrid(4); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); assert rs.isEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 5716d59..de38952 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -240,7 +240,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(0, new AffinityTopologyVersion(2, waitMinorVer)); waitForRebalancing(1, new AffinityTopologyVersion(2, waitMinorVer)); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkPartitionMapExchangeFinished(); @@ -250,7 +250,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(1, 3); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkPartitionMapExchangeFinished(); @@ -261,7 +261,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(1, new AffinityTopologyVersion(4, waitMinorVer)); waitForRebalancing(2, new AffinityTopologyVersion(4, waitMinorVer)); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkPartitionMapExchangeFinished(); @@ -271,7 +271,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(1, 5); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkPartitionMapExchangeFinished(); @@ -339,7 +339,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { concurrentStartFinished = true; - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); @@ -607,7 +607,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 5, 1); waitForRebalancing(4, 5, 1); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); @@ -631,7 +631,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 6); waitForRebalancing(4, 6); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); @@ -641,7 +641,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 7); waitForRebalancing(4, 7); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkSupplyContextMapIsEmpty(); @@ -650,7 +650,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 8); waitForRebalancing(4, 8); - awaitPartitionMapExchange(true, true); + awaitPartitionMapExchange(true, true, null); checkPartitionMapExchangeFinished(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java index 87d02a5..cde6b8d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java @@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ - private static final boolean DISCO_DEBUG_MODE = false; - /** * Constructs test. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java index 9b0637e..f3942d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java @@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest { /** Entry count. */ public static final int CNT = 100_000; - public static final String STATIC_CACHE_NAME = "static"; + + /** */ + private static final String STATIC_CACHE_NAME = "static"; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 1b7fe2b..d2cb710 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -537,7 +537,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = grid(i).affinity(null); - Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(); + Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(false); for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) { if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java index 5ecc27a..1259faf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java @@ -68,6 +68,13 @@ public class IgniteNoCustomEventsOnNodeStart extends GridCommonAbstractTest { assertFalse(failed); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 1ce98a5..043208c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; @@ -114,6 +115,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** */ private GridStringLogger strLog; + /** */ + private CacheConfiguration[] ccfgs; + + /** */ + private boolean client; + /** * @throws Exception If fails. */ @@ -152,7 +159,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(spi); - cfg.setCacheConfiguration(); + if (ccfgs != null) + cfg.setCacheConfiguration(ccfgs); + else + cfg.setCacheConfiguration(); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); @@ -194,9 +204,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode")) cfg.setFailureDetectionTimeout(30_000); - else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode")) { + else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode")) cfg.setFailureDetectionTimeout(3_000); - } else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode")) { cfg.setFailureDetectionTimeout(6_000); @@ -205,6 +214,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { else if (gridName.contains("testNodeShutdownOnRingMessageWorkerFailureFailedNode")) cfg.setGridLogger(strLog = new GridStringLogger()); + cfg.setClientMode(client); + return cfg; } @@ -1961,6 +1972,63 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testDuplicatedDiscoveryDataRemoved() throws Exception { + try { + TestDiscoveryDataDuplicateSpi.checkNodeAdded = false; + TestDiscoveryDataDuplicateSpi.checkClientNodeAddFinished = false; + TestDiscoveryDataDuplicateSpi.fail = false; + + ccfgs = new CacheConfiguration[5]; + + for (int i = 0; i < ccfgs.length; i++) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(i == 0 ? null : ("static-cache-" + i)); + + ccfgs[i] = ccfg; + } + + TestDiscoveryDataDuplicateSpi spi = new TestDiscoveryDataDuplicateSpi(); + + nodeSpi.set(spi); + + startGrid(0); + + for (int i = 0; i < 5; i++) { + nodeSpi.set(new TestDiscoveryDataDuplicateSpi()); + + startGrid(i + 1); + } + + client = true; + + Ignite clientNode = startGrid(6); + + assertTrue(clientNode.configuration().isClientMode()); + + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName("c1"); + + clientNode.createCache(ccfg); + + client = false; + + nodeSpi.set(new TestDiscoveryDataDuplicateSpi()); + + startGrid(7); + + assertTrue(TestDiscoveryDataDuplicateSpi.checkNodeAdded); + assertTrue(TestDiscoveryDataDuplicateSpi.checkClientNodeAddFinished); + assertFalse(TestDiscoveryDataDuplicateSpi.fail); + } + finally { + stopAllGrids(); + } + } + + /** * @param nodeName Node name. * @throws Exception If failed. */ @@ -2015,6 +2083,66 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } } + /** + * + */ + private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi { + /** */ + static volatile boolean fail; + + /** */ + static volatile boolean checkNodeAdded; + + /** */ + static volatile boolean checkClientNodeAddFinished; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddedMessage)msg).oldNodesDiscoveryData(); + + checkDiscoData(discoData, msg); + } + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddFinishedMessage)msg).clientDiscoData(); + + checkDiscoData(discoData, msg); + } + + super.writeToSocket(sock, out, msg, timeout); + } + + /** + * @param discoData Discovery data. + * @param msg Message. + */ + private void checkDiscoData(Map<UUID, Map<Integer, byte[]>> discoData, TcpDiscoveryAbstractMessage msg) { + if (discoData != null && discoData.size() > 1) { + int cnt = 0; + + for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) { + Map<Integer, byte[]> map = e.getValue(); + + if (map.containsKey(GridComponent.DiscoveryDataExchangeType.CACHE_PROC.ordinal())) + cnt++; + } + + if (cnt > 1) { + fail = true; + + log.error("Expect cache data only from one node, but actually: " + cnt); + } + + if (msg instanceof TcpDiscoveryNodeAddedMessage) + checkNodeAdded = true; + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) + checkClientNodeAddFinished = true; + } + } + } + /** * http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 87509a4..22fa36d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -428,21 +428,42 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { */ @SuppressWarnings("BusyWait") protected void awaitPartitionMapExchange() throws InterruptedException { - awaitPartitionMapExchange(false, false); + awaitPartitionMapExchange(false, false, null); } /** * @param waitEvicts If {@code true} will wait for evictions finished. * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. + * @param nodes Optional nodes. * @throws InterruptedException If interrupted. */ @SuppressWarnings("BusyWait") - protected void awaitPartitionMapExchange(boolean waitEvicts, boolean waitNode2PartUpdate) throws InterruptedException { + protected void awaitPartitionMapExchange(boolean waitEvicts, + boolean waitNode2PartUpdate, + @Nullable Collection<ClusterNode> nodes) + throws InterruptedException { long timeout = 30_000; + long startTime = -1; + + Set<String> names = new HashSet<>(); + for (Ignite g : G.allGrids()) { + if (nodes != null && !nodes.contains(g.cluster().localNode())) + continue; + IgniteKernal g0 = (IgniteKernal)g; + names.add(g0.configuration().getGridName()); + + if (startTime != -1) { + if (startTime != g0.context().discovery().gridStartTime()) + fail("Found nodes from different clusters, probable some test does not stop nodes " + + "[allNodes=" + names + ']'); + } + else + startTime = g0.context().discovery().gridStartTime(); + for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) { CacheConfiguration cfg = c.context().config();
