Repository: ignite Updated Branches: refs/heads/ignite-4154 098689c38 -> 0f9843ebb
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f9843eb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f9843eb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f9843eb Branch: refs/heads/ignite-4154 Commit: 0f9843ebb2de63acd110336642a90f38fce23696 Parents: 098689c Author: sboikov <[email protected]> Authored: Wed Nov 2 23:12:28 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 2 23:12:28 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 15 +++++- .../GridDhtPartitionsAbstractMessage.java | 34 ++++++++++++- .../GridDhtPartitionsExchangeFuture.java | 14 +++++- .../preloader/GridDhtPartitionsFullMessage.java | 53 ++++++++++++++++---- .../GridDhtPartitionsSingleMessage.java | 52 ++++++++++++++----- .../GridDhtPartitionsSingleRequest.java | 4 +- .../ignite/internal/util/IgniteUtils.java | 52 +++++++++++++++++++ 7 files changed, 194 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a901e2a..a81bf0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -765,12 +766,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); boolean useOldApi = false; + boolean compress = true; for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) + if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { useOldApi = true; + compress = false; + + break; + } + else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) + compress = false; } + m.compress(compress); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal() && cacheCtx.started()) { GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); @@ -817,7 +827,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), - cctx.versions().last()); + cctx.versions().last(), + node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 4e714ed..a3bb5f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -29,7 +30,13 @@ import org.jetbrains.annotations.Nullable; /** * Request for single partition info. */ -abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { +public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { + /** */ + public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11"); + + /** */ + protected static final byte COMPRESSED_FLAG_MASK = 1; + /** */ private static final long serialVersionUID = 0L; @@ -39,6 +46,9 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { /** Last used cache version. */ private GridCacheVersion lastVer; + /** */ + private byte flags; + /** * Required by {@link Externalizable}. */ @@ -79,6 +89,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { return lastVer; } + protected final boolean compressed() { + return (flags & COMPRESSED_FLAG_MASK) != 0; + } + + protected final void compressed(boolean compressed) { + flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -101,6 +119,12 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { writer.incrementState(); case 4: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 5: if (!writer.writeMessage("lastVer", lastVer)) return false; @@ -131,6 +155,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { reader.incrementState(); case 4: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: lastVer = reader.readMessage("lastVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 80b3768..6a17583 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -935,7 +935,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT throws IgniteCheckedException { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, clientOnlyExchange, - cctx.versions().last()); + cctx.versions().last(), + node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { @@ -974,14 +975,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT topologyVersion()); boolean useOldApi = false; + boolean compress = true; if (nodes != null) { for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) + if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { useOldApi = true; + compress = false; + + break; + } + else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) + compress = false; } } + m.compress(compress); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index a4ff04b..ea51f6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -61,6 +61,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Topology version. */ private AffinityTopologyVersion topVer; + /** */ + @GridDirectTransient + private boolean compress; + /** * Required by {@link Externalizable}. */ @@ -83,6 +87,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa this.topVer = topVer; } + public void compress(boolean compress) { + this.compress = compress; + } + /** * @return Local partitions. */ @@ -137,6 +145,21 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partCntrs != null && partCntrsBytes == null) partCntrsBytes = U.marshal(ctx, partCntrs); + + if (compress && !compressed()) { + try { + byte[] partsBytesZip = U.zip(partsBytes); + byte[] partCntrsBytesZip = U.zip(partCntrsBytes); + + partsBytes = partsBytesZip; + partCntrsBytes = partCntrsBytesZip; + + compressed(true); + } + catch (IgniteCheckedException e) { + U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); + } + } } /** @@ -157,14 +180,22 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null && parts == null) - parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partsBytes != null && parts == null) { + if (compressed()) + parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } if (parts == null) parts = new HashMap<>(); - if (partCntrsBytes != null && partCntrs == null) - partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partCntrsBytes != null && partCntrs == null) { + if (compressed()) + partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } if (partCntrs == null) partCntrs = new HashMap<>(); @@ -185,19 +216,19 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } switch (writer.state()) { - case 5: + case 6: if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); - case 6: + case 7: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; writer.incrementState(); - case 7: + case 8: if (!writer.writeMessage("topVer", topVer)) return false; @@ -219,7 +250,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return false; switch (reader.state()) { - case 5: + case 6: partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) @@ -227,7 +258,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 6: + case 7: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -235,7 +266,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 7: + case 8: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -255,7 +286,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index e4356b1..fdfc485 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -59,6 +59,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** */ private boolean client; + /** */ + private boolean compress; + /** * Required by {@link Externalizable}. */ @@ -73,10 +76,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, - @Nullable GridCacheVersion lastVer) { + @Nullable GridCacheVersion lastVer, + boolean compress) { super(exchId, lastVer); this.client = client; + this.compress = compress; } /** @@ -141,17 +146,40 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partCntrsBytes == null && partCntrs != null) partCntrsBytes = U.marshal(ctx, partCntrs); + + if (compress && !compressed()) { + try { + byte[] partsBytesZip = U.zip(partsBytes); + byte[] partCntrsBytesZip = U.zip(partCntrsBytes); + + partsBytes = partsBytesZip; + partCntrsBytes = partCntrsBytesZip; + + compressed(true); + } + catch (IgniteCheckedException e) { + U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); + } + } } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null && parts == null) - parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partsBytes != null && parts == null) { + if (compressed()) + parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + parts =U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } - if (partCntrsBytes != null && partCntrs == null) - partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (partCntrsBytes != null && partCntrs == null) { + if (compressed()) + partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } } /** {@inheritDoc} */ @@ -169,19 +197,19 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } switch (writer.state()) { - case 5: + case 6: if (!writer.writeBoolean("client", client)) return false; writer.incrementState(); - case 6: + case 7: if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); - case 7: + case 8: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -203,7 +231,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes return false; switch (reader.state()) { - case 5: + case 6: client = reader.readBoolean("client"); if (!reader.isLastRead()) @@ -211,7 +239,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 6: + case 7: partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) @@ -219,7 +247,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 7: + case 8: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -239,7 +267,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index a4106af..850b6d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@ -81,11 +81,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 6; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtPartitionsSingleRequest.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0f9843eb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 1e8d648..da4edc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.util; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.Externalizable; @@ -128,6 +130,8 @@ import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; import javax.management.DynamicMBean; import javax.management.JMException; import javax.management.MBeanServer; @@ -9665,6 +9669,25 @@ public abstract class IgniteUtils { } } + public static <T> T unmarshalZip(Marshaller marsh, byte[] zipBytes, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { + assert marsh != null; + assert zipBytes != null; + + try { + ZipInputStream in = new ZipInputStream(new ByteArrayInputStream(zipBytes)); + + in.getNextEntry(); + + return marsh.unmarshal(in, clsLdr); + } + catch (IgniteCheckedException e) { + throw e; + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + /** * Unmarshals object from the input stream using given class loader. * This method should not close given input stream. @@ -9880,4 +9903,33 @@ public abstract class IgniteUtils { if (oldName != curName) LOC_IGNITE_NAME.set(oldName); } + + public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException { + try { + if (bytes == null) + return null; + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + try (ZipOutputStream zos = new ZipOutputStream(bos)) { + ZipEntry entry = new ZipEntry(""); + + try { + entry.setSize(bytes.length); + + zos.putNextEntry(entry); + + zos.write(bytes); + } + finally { + zos.closeEntry(); + } + } + + return bos.toByteArray(); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } }
