This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 531e0ff0420 IGNITE-18236 Cache objects transformation (#10393)
531e0ff0420 is described below
commit 531e0ff0420a7f4be2712f99a594acecc8c0437b
Author: Anton Vinogradov <[email protected]>
AuthorDate: Tue Mar 21 19:23:32 2023 +0300
IGNITE-18236 Cache objects transformation (#10393)
---
.../compress/CompressionProcessorImpl.java | 40 +-
.../AbstractCacheObjectCompressionTest.java | 278 ++++++++++++++
.../CacheObjectCompressionConsumptionTest.java | 412 +++++++++++++++++++++
.../transform/CacheObjectCompressionTest.java | 130 +++++++
.../compress/CompressionProcessorTest.java | 7 +-
.../processors/compress/FileSystemUtilsTest.java | 6 +-
.../testsuites/IgniteCompressionTestSuite.java | 41 ++
.../ignite/events/CacheObjectTransformedEvent.java | 86 +++++
.../java/org/apache/ignite/events/EventType.java | 13 +-
.../internal/ThreadLocalDirectByteBuffer.java | 94 +++++
.../ignite/internal/binary/BinaryObjectImpl.java | 129 +++++--
.../internal/binary/GridBinaryMarshaller.java | 3 +
.../binary/builder/BinaryBuilderReader.java | 3 +-
.../transform/CacheObjectTransformerManager.java | 45 +++
.../processors/cache/CacheObjectAdapter.java | 39 +-
.../internal/processors/cache/CacheObjectImpl.java | 10 +-
.../cache/CacheObjectTransformerUtils.java | 161 ++++++++
.../processors/cache/GridCacheProcessor.java | 6 +-
.../processors/cache/GridCacheSharedContext.java | 24 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 4 +-
.../snapshot/IgniteSnapshotManager.java | 2 +-
.../wal/reader/IgniteWalIteratorFactory.java | 2 +-
.../cacheobject/UserCacheObjectImpl.java | 7 +-
.../db/wal/IgniteWalIteratorSwitchSegmentTest.java | 2 +
.../pagemem/BPlusTreePageMemoryImplTest.java | 3 +-
.../BPlusTreeReuseListPageMemoryImplTest.java | 1 +
.../pagemem/IndexStoragePageMemoryImplTest.java | 3 +-
.../pagemem/PageMemoryImplNoLoadTest.java | 1 +
.../persistence/pagemem/PageMemoryImplTest.java | 1 +
.../AbstractCacheObjectTransformationTest.java | 340 +++++++++++++++++
.../CacheObjectTransformationEvolutionTest.java | 339 +++++++++++++++++
.../transform/CacheObjectTransformationTest.java | 164 ++++++++
.../TestCacheObjectTransformerManagerAdapter.java | 63 ++++
.../TestCacheObjectTransformerPluginProvider.java | 51 +++
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../ignite/testsuites/IgniteCacheTestSuite13.java | 5 +
...dexQueryCacheKeyValueTransformedFieldsTest.java | 71 ++++
.../ignite/cache/query/IndexQueryTestSuite.java | 1 +
parent/pom.xml | 4 +-
39 files changed, 2480 insertions(+), 115 deletions(-)
diff --git
a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
index d2e22caf702..49de6a6e6ef 100644
---
a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
+++
b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
import org.apache.ignite.internal.pagemem.PageUtils;
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
import
org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
@@ -48,10 +49,11 @@ import static
org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
*/
public class CompressionProcessorImpl extends CompressionProcessor {
/** Max page size. */
- private final ThreadLocalByteBuffer compactBuf = new
ThreadLocalByteBuffer(MAX_PAGE_SIZE);
+ private final ThreadLocalDirectByteBuffer compactBuf = new
ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);
/** A bit more than max page size, extra space is required by compressors.
*/
- private final ThreadLocalByteBuffer compressBuf = new
ThreadLocalByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE));
+ private final ThreadLocalDirectByteBuffer compressBuf =
+ new
ThreadLocalDirectByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE),
NATIVE_BYTE_ORDER);
/**
* @param ctx Kernal context.
@@ -61,14 +63,6 @@ public class CompressionProcessorImpl extends
CompressionProcessor {
super(ctx);
}
- /**
- * @param cap Capacity.
- * @return Direct byte buffer.
- */
- static ByteBuffer allocateDirectBuffer(int cap) {
- return ByteBuffer.allocateDirect(cap).order(NATIVE_BYTE_ORDER);
- }
-
/** {@inheritDoc} */
@Override public void checkPageCompressionSupported() throws
IgniteCheckedException {
// No-op.
@@ -457,30 +451,4 @@ public class CompressionProcessorImpl extends
CompressionProcessor {
decompressor.decompress(page, dst);
}
}
-
- /**
- */
- static final class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
- /** */
- final int size;
-
- /**
- * @param size Size.
- */
- ThreadLocalByteBuffer(int size) {
- this.size = size;
- }
-
- /** {@inheritDoc} */
- @Override protected ByteBuffer initialValue() {
- return allocateDirectBuffer(size);
- }
-
- /** {@inheritDoc} */
- @Override public ByteBuffer get() {
- ByteBuffer buf = super.get();
- buf.clear();
- return buf;
- }
- }
}
diff --git
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/AbstractCacheObjectCompressionTest.java
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/AbstractCacheObjectCompressionTest.java
new file mode 100644
index 00000000000..a9451d88211
--- /dev/null
+++
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/AbstractCacheObjectCompressionTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import com.github.luben.zstd.Zstd;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
+import org.xerial.snappy.Snappy;
+
+import static
org.apache.ignite.internal.binary.GridBinaryMarshaller.TRANSFORMED;
+
+/**
+ *
+ */
+public abstract class AbstractCacheObjectCompressionTest extends
AbstractCacheObjectTransformationTest {
+ /** Huge string. */
+ protected static final String HUGE_STRING;
+
+ static {
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < 1000; i++)
+ sb.append("A");
+
+ HUGE_STRING = sb.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setPluginProviders(
+ new TestCacheObjectTransformerPluginProvider(new
CompressionTransformer()));
+ }
+
+ /**
+ *
+ */
+ protected static final class StringData {
+ /** S. */
+ private final String s;
+
+ /**
+ * @param s S.
+ */
+ public StringData(String s) {
+ this.s = s;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ StringData data = (StringData)o;
+
+ return Objects.equals(s, data.s);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(s);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class CompressionTransformer extends
TestCacheObjectTransformerManagerAdapter {
+ /** Comptession type. */
+ protected static volatile CompressionType type =
CompressionType.defaultType();
+
+ /** */
+ private static final LZ4Factory lz4Factory =
LZ4Factory.fastestInstance();
+
+ /** */
+ static final LZ4FastDecompressor lz4Decompressor =
lz4Factory.fastDecompressor();
+
+ /** */
+ static final LZ4Compressor lz4Compressor =
lz4Factory.highCompressor(1);
+
+ /** Direct byte buffer. */
+ private final ThreadLocalDirectByteBuffer src = new
ThreadLocalDirectByteBuffer();
+
+ /** Direct byte buffer. */
+ private final ThreadLocalDirectByteBuffer dst = new
ThreadLocalDirectByteBuffer();
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer transform(ByteBuffer original) {
+ if (type == CompressionType.DISABLED)
+ return null;
+
+ int overhead = 9; // Transformed flag + compression type + length.
+
+ int origSize = original.remaining();
+ int lim = origSize - overhead;
+
+ if (lim <= 0)
+ return null;
+
+ int maxCompLen;
+
+ switch (type) {
+ case ZSTD:
+ maxCompLen = (int)Zstd.compressBound(origSize);
+
+ break;
+
+ case LZ4:
+ maxCompLen = lz4Compressor.maxCompressedLength(origSize);
+
+ break;
+
+ case SNAPPY:
+ maxCompLen = Snappy.maxCompressedLength(origSize);
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ original = toDirect(original);
+
+ ByteBuffer compressed = dst.get(overhead + maxCompLen);
+
+ compressed.put(TRANSFORMED);
+ compressed.putInt(type.ordinal());
+ compressed.putInt(origSize);
+
+ assertEquals(overhead, compressed.position());
+
+ int size;
+
+ switch (type) {
+ case ZSTD:
+ size = Zstd.compress(compressed, original, 1);
+
+ compressed.flip();
+
+ break;
+
+ case LZ4:
+ lz4Compressor.compress(original, compressed);
+
+ size = compressed.position() - overhead;
+
+ compressed.flip();
+
+ break;
+
+ case SNAPPY:
+ try {
+ size = Snappy.compress(original, compressed);
+
+ compressed.position(0);
+ }
+ catch (IOException e) {
+ return null;
+ }
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ if (size >= lim)
+ return null; // Compression is not profitable.
+
+ return compressed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer restore(ByteBuffer transformed) {
+ CompressionType type =
CompressionType.values()[transformed.getInt()];
+ int length = transformed.getInt();
+
+ transformed = toDirect(transformed);
+
+ ByteBuffer restored = dst.get(length);
+
+ switch (type) {
+ case ZSTD:
+ Zstd.decompress(restored, transformed);
+
+ restored.flip();
+
+ break;
+
+ case LZ4:
+ lz4Decompressor.decompress(transformed, restored);
+
+ restored.flip();
+
+ break;
+
+ case SNAPPY:
+ try {
+ Snappy.uncompress(transformed, restored);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ return restored;
+ }
+
+ /**
+ * Some libs may require direct byte buffers.
+ *
+ * @param buf Buffer.
+ */
+ private ByteBuffer toDirect(ByteBuffer buf) {
+ assertFalse(buf.isDirect());
+
+ ByteBuffer direct = src.get(buf.remaining());
+
+ direct.put(buf);
+ direct.flip();
+
+ return direct;
+ }
+
+ /**
+ *
+ */
+ protected enum CompressionType {
+ /** Compression disabled. */
+ DISABLED,
+
+ /** Zstd compression. */
+ ZSTD,
+
+ /** LZ4 compression. */
+ LZ4,
+
+ /** Snappy compression. */
+ SNAPPY;
+
+ /**
+ * @return default.
+ */
+ static CompressionType defaultType() {
+ return ZSTD;
+ }
+ }
+ }
+}
diff --git
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
new file mode 100644
index 00000000000..38ce530e530
--- /dev/null
+++
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.commons.io.FileUtils;
+import org.apache.ignite.DataRegionMetrics;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+import static
org.apache.ignite.internal.processors.metric.GridMetricManager.CLIENT_CONNECTOR_METRICS;
+import static
org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_NAME;
+import static
org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheObjectCompressionConsumptionTest extends
AbstractCacheObjectCompressionTest {
+ /** Region name. */
+ private static final String REGION_NAME = "region";
+
+ /** Thin client. */
+ @Parameterized.Parameter
+ public ConsumptionTestMode mode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "mode={0}")
+ public static Collection<?> parameters() {
+ List<Object[]> res = new ArrayList<>();
+
+ for (ConsumptionTestMode mode : ConsumptionTestMode.values())
+ res.add(new Object[] {mode});
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataRegionConfiguration drCgf = new DataRegionConfiguration()
+ .setName(REGION_NAME)
+ .setMetricsEnabled(true)
+ .setMaxSize(1000L * 1024 * 1024)
+ .setInitialSize(1000L * 1024 * 1024);
+
+ if (mode == ConsumptionTestMode.PERSISTENT)
+ drCgf.setPersistenceEnabled(true);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(drCgf)
+ .setMetricsEnabled(true));
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @org.junit.Test
+ public void testString() throws Exception {
+ testConsumption((i) -> i, this::hugeValue);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @org.junit.Test
+ public void testWrappedString() throws Exception {
+ testConsumption((i) -> i, (i) -> new StringData(hugeValue(i)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @org.junit.Test
+ public void testStringArray() throws Exception {
+ testConsumption((i) -> i, (i) -> new String[] {hugeValue(i)});
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @org.junit.Test
+ public void testWrappedStringArray() throws Exception {
+ testConsumption((i) -> i, (i) -> new StringData[] {new
StringData(hugeValue(i))});
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_USE_BINARY_ARRAYS,
value = "true")
+ @org.junit.Test
+ public void testWrappedStringBinaryArray() throws Exception {
+ testWrappedStringArray();
+ }
+
+ /**
+ *
+ */
+ @org.junit.Test
+ public void testIncompressible() {
+ GridTestUtils.assertThrowsWithCause(
+ () -> {
+ testConsumption((i) -> i, (i) -> i);
+
+ return null;
+ }, AssertionError.class);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testConsumption(Function<Integer, Object> keyGen,
Function<Integer, Object> valGen) throws Exception {
+ List<Integer> cnts = new ArrayList<>();
+ List<Consumption> raws = new ArrayList<>();
+ List<Consumption> comps = new ArrayList<>();
+
+ for (int i = 1; i <= 4; i++) {
+ int cnt = 2000 + i * 1000; // At least 2000 entries to guarantee
compression profit.
+
+ Consumption raw;
+ Consumption compressed;
+
+ CompressionTransformer.type =
CompressionTransformer.CompressionType.DISABLED;
+
+ raw = doTest(cnt, keyGen, valGen); // Compression disabled.
+
+ CompressionTransformer.type =
CompressionTransformer.CompressionType.defaultType();
+
+ compressed = doTest(cnt, keyGen, valGen); // Compression enabled.
+
+ assertTrue("Network, raw=" + raw.net + ", compressed=" +
compressed.net, raw.net > compressed.net);
+ assertTrue("Memory, raw=" + raw.mem + ", compressed=" +
compressed.mem, raw.mem > compressed.mem);
+
+ if (mode == ConsumptionTestMode.PERSISTENT)
+ assertTrue("Persistence, raw=" + raw.persist + ", compressed="
+ compressed.persist,
+ raw.persist > compressed.persist);
+
+ cnts.add(cnt);
+ raws.add(raw);
+ comps.add(compressed);
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("\nComparison results [mode=").append(mode).append("]:");
+
+ for (int i = 0; i < cnts.size(); i++) {
+ long rn = raws.get(i).net;
+ long cn = comps.get(i).net;
+ long rm = raws.get(i).mem;
+ long cm = comps.get(i).mem;
+ long rp = raws.get(i).persist;
+ long cp = comps.get(i).persist;
+
+ sb.append("\nEntries=")
+ .append(cnts.get(i))
+ .append("\n\t")
+ .append("Network [raw=")
+ .append(rn)
+ .append(", compressed=")
+ .append(cn)
+ .append(", profit=")
+ .append((rn - cn) * 100 / rn)
+ .append("%],\n\t")
+ .append("Memory [raw=")
+ .append(rm)
+ .append(", compressed=")
+ .append(cm)
+ .append(", profit=")
+ .append((rm - cm) * 100 / rm)
+ .append("%],\n\t")
+ .append("Persistence [raw=")
+ .append(rp)
+ .append(", compressed=")
+ .append(cp)
+ .append(", profit=")
+ .append((mode == ConsumptionTestMode.PERSISTENT) ? (rp - cp) *
100 / rp : "NA")
+ .append("%]");
+ }
+
+ for (int i = 1; i < cnts.size(); i++) {
+ long rnd = raws.get(i).net - raws.get(i - 1).net;
+ long cnd = comps.get(i).net - comps.get(i - 1).net;
+ long rmd = raws.get(i).mem - raws.get(i - 1).mem;
+ long cmd = comps.get(i).mem - comps.get(i - 1).mem;
+ long rpd = raws.get(i).persist - raws.get(i - 1).persist;
+ long cpd = comps.get(i).persist - comps.get(i - 1).persist;
+
+ assertTrue(rnd > 0);
+ assertTrue(cnd > 0);
+ assertTrue(rmd > 0);
+ assertTrue(cmd > 0);
+
+ assertTrue(mode == ConsumptionTestMode.PERSISTENT ? rpd > 0 : rpd
== 0);
+ assertTrue(mode == ConsumptionTestMode.PERSISTENT ? cpd > 0 : cpd
== 0);
+
+ sb.append("\nDiff [entries=")
+ .append(cnts.get(i - 1))
+ .append("->")
+ .append(cnts.get(i))
+ .append("]\n\t")
+ .append("Network [raw=")
+ .append(rnd)
+ .append(", compressed=")
+ .append(cnd)
+ .append(", profit=")
+ .append((rnd - cnd) * 100 / rnd)
+ .append("%],\n\t")
+ .append("Memory [raw=")
+ .append(rmd)
+ .append(", compressed=")
+ .append(cmd)
+ .append(", profit=")
+ .append((rmd - cmd) * 100 / rmd)
+ .append("%],\n\t")
+ .append("Persistence [raw=")
+ .append(rpd)
+ .append(", compressed=")
+ .append(cpd)
+ .append(", profit=")
+ .append((mode == ConsumptionTestMode.PERSISTENT) ? (rpd - cpd)
* 100 / rpd : "NA")
+ .append("%]");
+ }
+
+ log.info(sb.toString());
+ }
+
+ /**
+ *
+ */
+ private Consumption doTest(int cnt, Function<Integer, Object> keyGen,
Function<Integer, Object> valGen) throws Exception {
+ try {
+ cleanPersistenceDir();
+
+ Ignite ignite = startGrids(2);
+
+ if (mode == ConsumptionTestMode.PERSISTENT)
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite node : G.allGrids())
+ node.configuration().getCommunicationSpi().resetMetrics();
+
+ Ignite prim = primaryNode(0, CACHE_NAME);
+
+ if (mode == ConsumptionTestMode.THIN_CLIENT) {
+ String host = prim.configuration().getLocalHost();
+ int port =
prim.configuration().getClientConnectorConfiguration().getPort();
+
+ try (IgniteClient client = G.startClient(new
ClientConfiguration().setAddresses(host + ":" + port))) {
+ ClientCache<Object, Object> cache =
client.cache(CACHE_NAME);
+
+ for (int i = 0; i < cnt; i++) {
+ Object key = keyGen.apply(i);
+ Object val = valGen.apply(i);
+
+ cache.put(key, val);
+
+ assertEqualsArraysAware(cache.get(key), val);
+ }
+ }
+ }
+ else {
+ IgniteCache<Object, Object> cache =
prim.getOrCreateCache(CACHE_NAME);
+
+ for (int i = 0; i < cnt; i++) {
+ Object key = keyGen.apply(i);
+ Object val = valGen.apply(i);
+
+ cache.put(key, val);
+
+ assertEqualsArraysAware(cache.get(key), val);
+ }
+ }
+
+ long net = 0;
+ long mem = 0;
+ long pers = 0;
+
+ for (Ignite node : G.allGrids()) {
+ if (mode == ConsumptionTestMode.PERSISTENT)
+ forceCheckpoint(node);
+
+ CommunicationSpi<?> spi =
node.configuration().getCommunicationSpi();
+
+ net += spi.getSentBytesCount();
+ net += spi.getReceivedBytesCount();
+
+ long clNet = 0;
+
+ MetricRegistry reg = mreg(node, CLIENT_CONNECTOR_METRICS);
+
+ clNet +=
reg.<LongMetric>findMetric(SENT_BYTES_METRIC_NAME).value();
+ clNet +=
reg.<LongMetric>findMetric(RECEIVED_BYTES_METRIC_NAME).value();
+
+ if (mode != ConsumptionTestMode.THIN_CLIENT)
+ assertEquals(0, clNet);
+
+ net += clNet;
+
+ DataRegionMetrics metrics =
node.dataRegionMetrics(REGION_NAME);
+
+ mem += metrics.getTotalAllocatedSize();
+
+ String nodeFolder =
((IgniteEx)node).context().pdsFolderResolver().resolveFolders().folderName();
+
+ pers += FileUtils.sizeOfDirectory(
+ U.resolveWorkDirectory(U.defaultWorkDirectory(),
DFLT_STORE_DIR + "/" + nodeFolder, false));
+
+ if (mode != ConsumptionTestMode.PERSISTENT)
+ assertEquals(0, pers);
+ }
+
+ return new Consumption(net, mem, pers);
+ }
+ finally {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+ }
+
+ /** Obtains the metric registry with the specified name from Ignite
instance. */
+ private MetricRegistry mreg(Ignite ignite, String name) {
+ return ((IgniteEx)ignite).context().metric().registry(name);
+ }
+
+ /***/
+ private String hugeValue(int i) {
+ return HUGE_STRING + i;
+ }
+
+ /***/
+ private static class Consumption {
+ /** Network. */
+ long net;
+
+ /** Memory. */
+ long mem;
+
+ /** Persistence. */
+ long persist;
+
+ /***/
+ public Consumption(long net, long mem, long persist) {
+ this.net = net;
+ this.mem = mem;
+ this.persist = persist;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "[net=" + net + ", mem=" + mem + ", pers=" + persist + ']';
+ }
+ }
+
+ /**
+ *
+ */
+ private enum ConsumptionTestMode {
+ /** Node. */
+ NODE,
+
+ /** Thin client. */
+ THIN_CLIENT,
+
+ /** Node + Persistent. */
+ PERSISTENT
+ }
+}
diff --git
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionTest.java
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionTest.java
new file mode 100644
index 00000000000..84aff7d68a7
--- /dev/null
+++
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.ignite.internal.processors.cache.transform.AbstractCacheObjectCompressionTest.CompressionTransformer.CompressionType;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheObjectCompressionTest extends
AbstractCacheObjectCompressionTest {
+ /** Thin client. */
+ @Parameterized.Parameter
+ public CompressionType type;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "type={0}")
+ public static Collection<?> parameters() {
+ List<Object[]> res = new ArrayList<>();
+
+ for (CompressionType type : CompressionType.values())
+ res.add(new Object[] {type});
+
+ return res;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCompression() throws Exception {
+ try {
+ CompressionTransformer.type = type;
+
+ Ignite ignite = prepareCluster();
+
+ int i = 42;
+
+ putAndGet(i, false); // No chances to compress integer.
+
+ String str = "Test string";
+
+ putAndGet(str, false); // Too short string.
+
+ StringData sd = new StringData(str);
+
+ putAndGet(sd, false); // Too short wrapped string.
+
+ List<Object> sdList = Collections.singletonList(sd);
+
+ putAndGet(sdList, false); // Too short wrapped string.
+
+ StringBuilder sb = new StringBuilder();
+
+ for (int k = 0; k < 100; k++)
+ sb.append("AAAAAAAAAA");
+
+ String str2 = sb.toString();
+
+ putAndGet(str2, type != CompressionType.DISABLED);
+
+ List<Object> list = new ArrayList<>();
+
+ list.add(new BinarizableData(str, null, i));
+
+ putAndGet(list, false); // Too short list.
+
+ // Adding more elements.
+ list.add(new BinarizableData(str + 1 /*avoiding possible 'as
handle' marshalling*/, null, i));
+ list.add(new BinarizableData(str + 2 /*avoiding possible 'as
handle' marshalling*/, null, i));
+
+ putAndGet(list, type != CompressionType.DISABLED); // Enough to be
compressed.
+
+ BinarizableData data = new BinarizableData(str, list, i);
+
+ putAndGet(data, type != CompressionType.DISABLED);
+
+ BinaryObjectBuilder builder =
ignite.binary().builder(BinarizableData.class.getName());
+
+ builder.setField("str", str2); // Wrapped string, enough to be
compressed.
+ builder.setField("list", list); // Wrapped strings, enough to be
compressed.
+ builder.setField("i", i);
+
+ putAndGet(builder.build(), type != CompressionType.DISABLED); //
Enough to be compressed.
+
+ builder.setField("str", str);
+
+ putAndGet(builder.build(), type != CompressionType.DISABLED); //
Still enough to be compressed.
+
+ builder.setField("list", null);
+
+ putAndGet(builder.build(), false); // Too short wrapped string.
+ }
+ finally {
+ CompressionTransformer.type =
CompressionTransformer.CompressionType.defaultType(); // Restoring default.
+ }
+ }
+
+ /**
+ *
+ */
+ private void putAndGet(Object val, boolean compressible) {
+ putAndGet(val, compressible, false);
+ }
+}
diff --git
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
index f499059a1fe..5985bb78619 100644
---
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
+++
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
@@ -55,7 +55,7 @@ import static
org.apache.ignite.internal.processors.compress.CompressionProcesso
import static
org.apache.ignite.internal.processors.compress.CompressionProcessor.UNCOMPRESSED_PAGE;
import static
org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_DEFAULT_LEVEL;
import static
org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
-import static
org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer;
+import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
/**
@@ -347,6 +347,11 @@ public class CompressionProcessorTest extends
GridCommonAbstractTest {
return PageUtils.getBytes(GridUnsafe.bufferAddress(page), 0,
PageIO.COMMON_HEADER_END);
}
+ /** */
+ private ByteBuffer allocateDirectBuffer(int cap) {
+ return ByteBuffer.allocateDirect(cap).order(NATIVE_BYTE_ORDER);
+ }
+
/**
*/
private static class Bytes {
diff --git
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
index be5e8969cdc..9a1f9484eae 100644
---
a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
+++
b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
@@ -30,10 +30,10 @@ import org.junit.Test;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
-import static
org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer;
import static
org.apache.ignite.internal.processors.compress.FileSystemUtils.getFileSystemBlockSize;
import static
org.apache.ignite.internal.processors.compress.FileSystemUtils.getSparseFileSize;
import static
org.apache.ignite.internal.processors.compress.FileSystemUtils.punchHole;
+import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -194,4 +194,8 @@ public class FileSystemUtilsTest {
}
}
+ /** */
+ private ByteBuffer allocateDirectBuffer(int cap) {
+ return ByteBuffer.allocateDirect(cap).order(NATIVE_BYTE_ORDER);
+ }
}
diff --git
a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgniteCompressionTestSuite.java
b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgniteCompressionTestSuite.java
new file mode 100644
index 00000000000..ef5129f4b05
--- /dev/null
+++
b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgniteCompressionTestSuite.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import java.util.ArrayList;
+import java.util.List;
+import
org.apache.ignite.internal.processors.cache.transform.CacheObjectCompressionConsumptionTest;
+import
org.apache.ignite.internal.processors.cache.transform.CacheObjectCompressionTest;
+import org.apache.ignite.testframework.junits.DynamicSuite;
+import org.junit.runner.RunWith;
+
+/** */
+@RunWith(DynamicSuite.class)
+public class IgniteCompressionTestSuite {
+ /**
+ * @return Suite.
+ */
+ public static List<Class<?>> suite() {
+ List<Class<?>> suite = new ArrayList<>();
+
+ suite.add(CacheObjectCompressionTest.class);
+ suite.add(CacheObjectCompressionConsumptionTest.class);
+
+ return suite;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/events/CacheObjectTransformedEvent.java
b/modules/core/src/main/java/org/apache/ignite/events/CacheObjectTransformedEvent.java
new file mode 100644
index 00000000000..42b31c8a58d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/events/CacheObjectTransformedEvent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ *
+ */
+@IgniteExperimental
+public class CacheObjectTransformedEvent extends EventAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Original cache object bytes. */
+ private final byte[] original;
+
+ /** Transformed cache object bytes. */
+ private final byte[] transformed;
+
+ /** Restore operation. */
+ private final boolean restore;
+
+ /**
+ * @param node Node.
+ * @param msg Message.
+ * @param type Type.
+ * @param original Original cache object bytes.
+ * @param transformed Transformed cache object bytes.
+ * @param restore Restore operation flag.
+ */
+ public CacheObjectTransformedEvent(
+ ClusterNode node,
+ String msg,
+ int type,
+ byte[] original,
+ byte[] transformed,
+ boolean restore
+ ) {
+ super(node, msg, type);
+
+ assert original != null;
+ assert transformed != null;
+
+ this.original = original.clone();
+ this.transformed = transformed.clone();
+ this.restore = restore;
+ }
+
+ /**
+ * @return Original bytes.
+ */
+ public byte[] getOriginal() {
+ return original;
+ }
+
+ /**
+ * @return Transformed bytes.
+ */
+ public byte[] getTransformed() {
+ return transformed;
+ }
+
+ /**
+ * @return True at restore operation.
+ */
+ public boolean isRestore() {
+ return restore;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index f82c3916d4c..cdacb5cede3 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -526,6 +526,16 @@ public interface EventType {
*/
public static final int EVT_CACHE_OBJECT_UNLOCKED = 67;
+ /**
+ * Built-in event type: cache object was transformed.
+ * <p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see CacheEvent
+ */
+ public static final int EVT_CACHE_OBJECT_TRANSFORMED = 68;
+
/**
* Built-in event type: cache object was expired when reading it.
* <p>
@@ -1162,7 +1172,8 @@ public interface EventType {
EVT_CACHE_OBJECT_REMOVED,
EVT_CACHE_OBJECT_LOCKED,
EVT_CACHE_OBJECT_UNLOCKED,
- EVT_CACHE_OBJECT_EXPIRED
+ EVT_CACHE_OBJECT_EXPIRED,
+ EVT_CACHE_OBJECT_TRANSFORMED
};
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/ThreadLocalDirectByteBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/ThreadLocalDirectByteBuffer.java
new file mode 100644
index 00000000000..3f25281dfe4
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/ThreadLocalDirectByteBuffer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * Thread local direct byte buffer.
+ */
+public class ThreadLocalDirectByteBuffer extends ThreadLocal<ByteBuffer> {
+ /** */
+ private final int size;
+
+ /** Byte order. */
+ private final ByteOrder order;
+
+ /** */
+ public ThreadLocalDirectByteBuffer() {
+ this(-1); // Avoiding useless initialization.
+ }
+
+ /** */
+ public ThreadLocalDirectByteBuffer(int size) {
+ this(size, null);
+ }
+
+ /** */
+ public ThreadLocalDirectByteBuffer(int size, ByteOrder order) {
+ this.size = size;
+ this.order = order;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ByteBuffer initialValue() {
+ return size > 0 ? allocateDirectBuffer(size) : null;
+ }
+
+ /** */
+ public ByteBuffer get(int capacity) {
+ assert capacity > 0 : capacity;
+
+ ByteBuffer buf = super.get();
+
+ if (buf == null || buf.capacity() < capacity) {
+ buf = allocateDirectBuffer(capacity);
+
+ set(buf);
+ }
+ else {
+ buf.clear();
+
+ buf.limit(capacity);
+ }
+
+ return buf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer get() {
+ assert size > 0 : size;
+
+ ByteBuffer buf = super.get();
+
+ buf.clear();
+
+ return buf;
+ }
+
+ /** */
+ protected ByteBuffer allocateDirectBuffer(int capacity) {
+ ByteBuffer buf = ByteBuffer.allocateDirect(capacity);
+
+ if (order != null)
+ buf = buf.order(order);
+
+ return buf;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index ad31f2ffc04..8a89e429552 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectTransformerUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
@@ -48,6 +49,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.binary.GridBinaryMarshaller.TRANSFORMED;
/**
* Binary object implementation.
@@ -62,9 +64,14 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
private BinaryContext ctx;
/** */
+ @GridDirectTransient
private byte[] arr;
+ /** Bytes to be stored or transferred instead of raw binary array. */
+ private byte[] valBytes;
+
/** */
+ @GridDirectTransient
private int start;
/** */
@@ -94,18 +101,54 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
assert ctx != null;
assert arr != null;
+ assert arr[start] != TRANSFORMED; // Raw array should never be
transformed.
+
this.ctx = ctx;
this.arr = arr;
this.start = start;
}
+ /**
+ * @param ctx Context.
+ * @param bytes Array/ValBytes.
+ */
+ public BinaryObjectImpl(BinaryContext ctx, byte[] bytes) {
+ assert ctx != null;
+ assert bytes != null;
+
+ this.ctx = ctx;
+
+ assert bytes[0] != TRANSFORMED; // Raw array should never be
transformed.
+
+ arr = bytes;
+ valBytes = bytes;
+ }
+
+ /**
+ * @param ctx Context.
+ * @param valBytes Value bytes.
+ * @param coCtx Cache object context.
+ */
+ public BinaryObjectImpl(BinaryContext ctx, byte[] valBytes,
CacheObjectContext coCtx) {
+ assert ctx != null;
+ assert valBytes != null;
+ assert coCtx != null;
+
+ this.ctx = ctx;
+ this.valBytes = valBytes;
+
+ arr = arrayFromValueBytes(coCtx);
+ }
+
/** {@inheritDoc} */
@Override public KeyCacheObject copy(int part) {
if (this.part == part)
return this;
BinaryObjectImpl cp = new BinaryObjectImpl(ctx, arr, start);
+
cp.part = part;
+ cp.valBytes = arr; // Keys should never be transformed.
return cp;
}
@@ -117,7 +160,11 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
/** {@inheritDoc} */
@Override public void partition(int part) {
+ assert part >= 0;
+
this.part = part;
+
+ valBytes = arr; // Keys should never be transformed.
}
/** {@inheritDoc} */
@@ -156,48 +203,45 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
/** {@inheritDoc} */
@Override public byte[] valueBytes(CacheObjectValueContext ctx) throws
IgniteCheckedException {
- if (detached())
- return array();
-
- int len = length();
-
- byte[] arr0 = new byte[len];
-
- U.arrayCopy(arr, start, arr0, 0, len);
-
- return arr0;
+ return valBytes;
}
/** {@inheritDoc} */
@Override public boolean putValue(ByteBuffer buf) throws
IgniteCheckedException {
- return putValue(buf, 0, CacheObjectAdapter.objectPutSize(length()));
+ return putValue(buf, 0,
CacheObjectAdapter.objectPutSize(valBytes.length));
}
/** {@inheritDoc} */
@Override public int putValue(long addr) throws IgniteCheckedException {
- return CacheObjectAdapter.putValue(addr, cacheObjectType(), arr,
start, length());
+ return CacheObjectAdapter.putValue(addr, cacheObjectType(), valBytes,
0, valBytes.length);
}
/** {@inheritDoc} */
@Override public boolean putValue(final ByteBuffer buf, int off, int len)
throws IgniteCheckedException {
- return CacheObjectAdapter.putValue(cacheObjectType(), buf, off, len,
arr, start);
+ return CacheObjectAdapter.putValue(cacheObjectType(), buf, off, len,
valBytes, 0);
}
/** {@inheritDoc} */
@Override public int valueBytesLength(CacheObjectContext ctx) throws
IgniteCheckedException {
- return CacheObjectAdapter.objectPutSize(length());
+ return CacheObjectAdapter.objectPutSize(valBytes.length);
}
/** {@inheritDoc} */
@Override public CacheObject prepareForCache(CacheObjectContext ctx) {
- if (detached())
- return this;
+ BinaryObjectImpl res = detached() ? this : detach();
+
+ res.prepareMarshal(ctx);
- return (BinaryObjectImpl)detach();
+ return res;
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(CacheObjectValueContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ assert arr != null || valBytes != null;
+
+ if (arr == null)
+ arr = arrayFromValueBytes(ctx);
+
CacheObjectBinaryProcessorImpl binaryProc =
(CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
this.ctx = binaryProc.binaryContext();
@@ -206,8 +250,27 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(CacheObjectValueContext ctx) throws
IgniteCheckedException {
- // No-op.
+ @Override public void prepareMarshal(CacheObjectValueContext ctx) {
+ assert arr != null || valBytes != null;
+
+ if (valBytes == null)
+ valBytes = valueBytesFromArray(ctx);
+ }
+
+ /**
+ * @return Array.
+ */
+ private byte[] arrayFromValueBytes(CacheObjectValueContext ctx) {
+ return CacheObjectTransformerUtils.restoreIfNecessary(valBytes, ctx);
+ }
+
+ /**
+ * @return Value bytes.
+ */
+ private byte[] valueBytesFromArray(CacheObjectValueContext ctx) {
+ assert part == -1; // Keys should never be transformed.
+
+ return CacheObjectTransformerUtils.transformIfNecessary(arr, start,
length(), ctx);
}
/** {@inheritDoc} */
@@ -737,10 +800,10 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
switch (writer.state()) {
case 0:
- if (!writer.writeByteArray("arr",
- arr,
- detachAllowed ? start : 0,
- detachAllowed ? length() : arr.length))
+ if (!writer.writeByteArray("valBytes",
+ valBytes,
+ 0,
+ valBytes.length))
return false;
writer.incrementState();
@@ -750,13 +813,6 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
return false;
writer.incrementState();
-
- case 2:
- if (!writer.writeInt("start", detachAllowed ? 0 : start))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -771,7 +827,7 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
switch (reader.state()) {
case 0:
- arr = reader.readByteArray("arr");
+ valBytes = reader.readByteArray("valBytes");
if (!reader.isLastRead())
return false;
@@ -785,15 +841,6 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
return false;
reader.incrementState();
-
- case 2:
- start = reader.readInt("start");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(BinaryObjectImpl.class);
@@ -806,7 +853,7 @@ public final class BinaryObjectImpl extends
BinaryObjectExImpl implements Extern
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 2;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
index b7ca3964de7..86aaaa40059 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
@@ -39,6 +39,9 @@ public class GridBinaryMarshaller {
/** Flag whether class caching should be used by the current thread. */
public static final ThreadLocal<Boolean> USE_CACHE =
ThreadLocal.withInitial(() -> Boolean.TRUE);
+ /** */
+ public static final byte TRANSFORMED = -3;
+
/** */
public static final byte OPTM_MARSH = -2;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
index 3ac9f862707..34f877c87b5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
@@ -843,8 +843,7 @@ public class BinaryBuilderReader implements
BinaryPositionReadable {
int start = readInt();
- BinaryObjectImpl binaryObj = new BinaryObjectImpl(ctx, arr,
- pos - 4 - size + start);
+ BinaryObjectImpl binaryObj = new BinaryObjectImpl(ctx, arr,
pos - 4 - size + start);
return new BinaryPlainBinaryObject(binaryObj);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/transform/CacheObjectTransformerManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/transform/CacheObjectTransformerManager.java
new file mode 100644
index 00000000000..3d693589559
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/transform/CacheObjectTransformerManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cache.transform;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides cache object's bytes transformation (eg. encryption, compression,
etc).
+ */
+public interface CacheObjectTransformerManager extends GridCacheSharedManager {
+ /**
+ * Transforms the data.
+ *
+ * @param original Original data.
+ * @return Transformed data (started with {@link
GridBinaryMarshaller#TRANSFORMED} when restorable)
+ * or {@code null} when transformation is not possible/suitable.
+ */
+ public @Nullable ByteBuffer transform(ByteBuffer original);
+
+ /**
+ * Restores the data.
+ *
+ * @param transformed Transformed data.
+ * @return Restored data.
+ */
+ public ByteBuffer restore(ByteBuffer transformed);
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 8f1ac58fc0f..b04f452d7a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -38,6 +38,9 @@ public abstract class CacheObjectAdapter implements
CacheObject, Externalizable
/** */
private static final long serialVersionUID = 2006765505127197251L;
+ /** Head size. */
+ protected static final int HEAD_SIZE = 5; // 4 bytes len + 1 byte type
+
/** */
@GridToStringInclude(sensitive = true)
@GridDirectTransient
@@ -54,6 +57,24 @@ public abstract class CacheObjectAdapter implements
CacheObject, Externalizable
return ctx.copyOnGet() && val != null &&
!ctx.kernalContext().cacheObjects().immutable(val);
}
+ /**
+ * @return Value bytes from value.
+ */
+ protected byte[] valueBytesFromValue(CacheObjectValueContext ctx) throws
IgniteCheckedException {
+ byte[] bytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
+
+ return CacheObjectTransformerUtils.transformIfNecessary(bytes, ctx);
+ }
+
+ /**
+ * @return Value from value bytes.
+ */
+ protected Object valueFromValueBytes(CacheObjectValueContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ byte[] bytes =
CacheObjectTransformerUtils.restoreIfNecessary(valBytes, ctx);
+
+ return ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, ldr);
+ }
+
/** {@inheritDoc} */
@Override public byte cacheObjectType() {
return TYPE_REGULAR;
@@ -195,7 +216,7 @@ public abstract class CacheObjectAdapter implements
CacheObject, Externalizable
* @see #putValue(byte, ByteBuffer, int, int, byte[], int)
*/
public static int objectPutSize(int dataLen) {
- return dataLen + 5;
+ return dataLen + HEAD_SIZE;
}
/**
@@ -220,19 +241,17 @@ public abstract class CacheObjectAdapter implements
CacheObject, Externalizable
if (buf.remaining() < len)
return false;
- final int headSize = 5; // 4 bytes len + 1 byte type
-
- if (off == 0 && len >= headSize) {
+ if (off == 0 && len >= HEAD_SIZE) {
buf.putInt(dataLen);
buf.put(cacheObjType);
- len -= headSize;
+ len -= HEAD_SIZE;
}
- else if (off >= headSize)
- off -= headSize;
+ else if (off >= HEAD_SIZE)
+ off -= HEAD_SIZE;
else {
// Partial header write.
- final ByteBuffer head = ByteBuffer.allocate(headSize);
+ final ByteBuffer head = ByteBuffer.allocate(HEAD_SIZE);
head.order(buf.order());
@@ -246,10 +265,10 @@ public abstract class CacheObjectAdapter implements
CacheObject, Externalizable
buf.put(head);
- if (head.limit() < headSize)
+ if (head.limit() < HEAD_SIZE)
return true;
- len -= headSize - off;
+ len -= HEAD_SIZE - off;
off = 0;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
index 46ec782ee9c..89b03e8d843 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
@@ -71,7 +71,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
if (valBytes == null) {
assert val != null;
- valBytes = proc.marshal(ctx, val);
+ valBytes = valueBytesFromValue(ctx);
}
if (ldr == null) {
@@ -89,7 +89,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
assert valBytes != null;
- Object val = proc.unmarshal(ctx, valBytes,
kernalCtx.config().isPeerClassLoadingEnabled() ?
+ Object val = valueFromValueBytes(ctx,
kernalCtx.config().isPeerClassLoadingEnabled() ?
kernalCtx.cache().context().deploy().globalLoader() : null);
if (ctx.storeValue())
@@ -105,7 +105,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
/** {@inheritDoc} */
@Override public byte[] valueBytes(CacheObjectValueContext ctx) throws
IgniteCheckedException {
if (valBytes == null)
- valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
+ valBytes = valueBytesFromValue(ctx);
return valBytes;
}
@@ -115,7 +115,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
assert val != null || valBytes != null;
if (valBytes == null)
- valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
+ valBytes = valueBytesFromValue(ctx);
}
/** {@inheritDoc} */
@@ -123,7 +123,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
assert val != null || valBytes != null;
if (val == null && ctx.storeValue())
- val = ctx.kernalContext().cacheObjects().unmarshal(ctx, valBytes,
ldr);
+ val = valueFromValueBytes(ctx, ldr);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectTransformerUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectTransformerUtils.java
new file mode 100644
index 00000000000..d6db1e0f72c
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectTransformerUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.events.CacheObjectTransformedEvent;
+import
org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_TRANSFORMED;
+import static
org.apache.ignite.internal.binary.GridBinaryMarshaller.TRANSFORMED;
+
+/** */
+public class CacheObjectTransformerUtils {
+ /** */
+ private static CacheObjectTransformerManager
transformer(CacheObjectValueContext ctx) {
+ return ctx.kernalContext().cache().context().transformer();
+ }
+
+ /**
+ * Transforms bytes according to {@link CacheObjectTransformerManager}
when specified.
+ * @param bytes Given bytes.
+ * @param ctx Context.
+ * @return Transformed bytes.
+ */
+ public static byte[] transformIfNecessary(byte[] bytes,
CacheObjectValueContext ctx) {
+ return transformIfNecessary(bytes, 0, bytes.length, ctx);
+ }
+
+ /**
+ * Transforms bytes according to {@link CacheObjectTransformerManager}
when specified.
+ * @param bytes Given bytes.
+ * @param ctx Context.
+ * @return Transformed bytes.
+ */
+ public static byte[] transformIfNecessary(byte[] bytes, int offset, int
length, CacheObjectValueContext ctx) {
+ assert bytes[offset] != TRANSFORMED;
+
+ CacheObjectTransformerManager transformer = transformer(ctx);
+
+ if (transformer == null)
+ return bytes;
+
+ ByteBuffer src = ByteBuffer.wrap(bytes, offset, length);
+ ByteBuffer transformed = transformer.transform(src);
+
+ if (transformed != null) {
+ assert transformed.remaining() > 0 : transformed.remaining();
+
+ byte[] res = toArray(transformed);
+
+ if
(ctx.kernalContext().event().isRecordable(EVT_CACHE_OBJECT_TRANSFORMED)) {
+ ctx.kernalContext().event().record(
+ new
CacheObjectTransformedEvent(ctx.kernalContext().discovery().localNode(),
+ "Object transformed",
+ EVT_CACHE_OBJECT_TRANSFORMED,
+ detachIfNecessary(bytes, offset, length),
+ res,
+ false));
+ }
+
+ return res;
+ }
+ else {
+ byte[] res = detachIfNecessary(bytes, offset, length);
+
+ if
(ctx.kernalContext().event().isRecordable(EVT_CACHE_OBJECT_TRANSFORMED)) {
+ ctx.kernalContext().event().record(
+ new
CacheObjectTransformedEvent(ctx.kernalContext().discovery().localNode(),
+ "Object transformation was cancelled.",
+ EVT_CACHE_OBJECT_TRANSFORMED,
+ res,
+ res,
+ false));
+ }
+
+ return res;
+ }
+ }
+
+ /**
+ *
+ */
+ private static byte[] detachIfNecessary(byte[] bytes, int offset, int
length) {
+ if (offset == 0 && length == bytes.length)
+ return bytes;
+
+ byte[] res = new byte[length];
+
+ U.arrayCopy(bytes, offset, res, 0, length);
+
+ return res;
+ }
+
+ /**
+ * Restores transformed bytes if necessary.
+ * @param bytes Given bytes.
+ * @param ctx Context.
+ * @return Restored bytes.
+ */
+ public static byte[] restoreIfNecessary(byte[] bytes,
CacheObjectValueContext ctx) {
+ if (bytes[0] != TRANSFORMED)
+ return bytes;
+
+ CacheObjectTransformerManager transformer = transformer(ctx);
+
+ ByteBuffer src = ByteBuffer.wrap(bytes, 1, bytes.length - 1); //
Skipping TRANSFORMED.
+ ByteBuffer restored = transformer.restore(src);
+
+ byte[] res = toArray(restored);
+
+ if
(ctx.kernalContext().event().isRecordable(EVT_CACHE_OBJECT_TRANSFORMED)) {
+ ctx.kernalContext().event().record(
+ new
CacheObjectTransformedEvent(ctx.kernalContext().discovery().localNode(),
+ "Object restored",
+ EVT_CACHE_OBJECT_TRANSFORMED,
+ res,
+ bytes,
+ true));
+ }
+
+ return res;
+ }
+
+ /**
+ * @param buf Buffer.
+ */
+ private static byte[] toArray(ByteBuffer buf) {
+ if (buf.isDirect()) {
+ byte[] res = new byte[buf.remaining()];
+
+ buf.get(res);
+
+ return res;
+ }
+ else {
+ if (buf.remaining() != buf.capacity())
+ throw new IllegalStateException("Unexpected Heap Byte Buffer
state. " +
+ "Wrapped array must contain the data without any offsets
to avoid unnecessary copying. " +
+ "Position must be 0, limit must be equal to the capacity."
+
+ " [buf=" + buf + "]");
+
+ return buf.array();
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 893893b8edd..518c8aa8989 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import
org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -3063,6 +3064,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
if (snpMgr == null)
snpMgr = new IgniteCacheSnapshotManager();
+ CacheObjectTransformerManager transMgr =
ctx.plugins().createComponent(CacheObjectTransformerManager.class);
+
GridCacheIoManager ioMgr = new GridCacheIoManager();
CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
GridCacheSharedTtlCleanupManager ttl = new
GridCacheSharedTtlCleanupManager();
@@ -3097,7 +3100,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
storeSesLsnrs,
mvccCachingMgr,
deadlockDetectionMgr,
- diagnosticMgr
+ diagnosticMgr,
+ transMgr
);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 17c77ed46a2..b14064dafc1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -36,6 +36,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import
org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -147,6 +148,9 @@ public class GridCacheSharedContext<K, V> {
/** Deadlock detection manager. */
private DeadlockDetectionManager deadlockDetectionMgr;
+ /** Cache objects transformation manager. */
+ private CacheObjectTransformerManager transMgr;
+
/** Cache contexts map. */
private final ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap;
@@ -237,7 +241,8 @@ public class GridCacheSharedContext<K, V> {
Collection<CacheStoreSessionListener> storeSesLsnrs,
MvccCachingManager mvccCachingMgr,
DeadlockDetectionManager deadlockDetectionMgr,
- CacheDiagnosticManager diagnosticMgr
+ CacheDiagnosticManager diagnosticMgr,
+ CacheObjectTransformerManager transMgr
) {
this.kernalCtx = kernalCtx;
@@ -262,7 +267,8 @@ public class GridCacheSharedContext<K, V> {
evictMgr,
mvccCachingMgr,
deadlockDetectionMgr,
- diagnosticMgr
+ diagnosticMgr,
+ transMgr
);
this.storeSesLsnrs = storeSesLsnrs;
@@ -444,7 +450,8 @@ public class GridCacheSharedContext<K, V> {
evictMgr,
mvccCachingMgr,
deadlockDetectionMgr,
- diagnosticMgr
+ diagnosticMgr,
+ transMgr
);
this.mgrs = mgrs;
@@ -495,7 +502,8 @@ public class GridCacheSharedContext<K, V> {
PartitionsEvictManager evictMgr,
MvccCachingManager mvccCachingMgr,
DeadlockDetectionManager deadlockDetectionMgr,
- CacheDiagnosticManager diagnosticMgr
+ CacheDiagnosticManager diagnosticMgr,
+ CacheObjectTransformerManager transMgr
) {
this.diagnosticMgr = add(mgrs, diagnosticMgr);
this.mvccMgr = add(mgrs, mvccMgr);
@@ -520,6 +528,7 @@ public class GridCacheSharedContext<K, V> {
this.evictMgr = add(mgrs, evictMgr);
this.mvccCachingMgr = add(mgrs, mvccCachingMgr);
this.deadlockDetectionMgr = add(mgrs, deadlockDetectionMgr);
+ this.transMgr = add(mgrs, transMgr);
}
/**
@@ -903,6 +912,13 @@ public class GridCacheSharedContext<K, V> {
return deadlockDetectionMgr;
}
+ /**
+ * @return Cache objects transformation manager.
+ */
+ public CacheObjectTransformerManager transformer() {
+ return transMgr;
+ }
+
/**
* @return Node ID.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index e6c3636ce89..db8bb33c8b4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -1379,7 +1379,7 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
@Override public CacheObject toCacheObject(CacheObjectContext ctx, byte
type, byte[] bytes) {
switch (type) {
case BinaryObjectImpl.TYPE_BINARY:
- return new BinaryObjectImpl(binaryContext(), bytes, 0);
+ return new BinaryObjectImpl(binaryContext(), bytes, ctx);
case BinaryObjectImpl.TYPE_BINARY_ENUM:
return new BinaryEnumObjectImpl(binaryContext(), bytes);
@@ -1399,7 +1399,7 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
throws IgniteCheckedException {
switch (type) {
case BinaryObjectImpl.TYPE_BINARY:
- return new BinaryObjectImpl(binaryContext(), bytes, 0);
+ return new BinaryObjectImpl(binaryContext(), bytes);
case CacheObject.TYPE_BYTE_ARR:
throw new IllegalArgumentException("Byte arrays cannot be used
as cache keys.");
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 1e5a8a74d78..926ac272e14 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -2498,7 +2498,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
GridCacheSharedContext<?, ?> sctx = new GridCacheSharedContext<>(ctx,
null, null, null,
null, null, null, null, null, null,
null, null, null, null, null,
- null, null, null, null, null, null);
+ null, null, null, null, null, null, null);
return new DataPageIterator(sctx, coctx, pageStore, partId);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 191d3c9987c..ef998e3aca2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -405,7 +405,7 @@ public class IgniteWalIteratorFactory {
kernalCtx, null, null, null,
null, null, null, dbMgr, null, null,
null, null, null, null, null,
- null, null, null, null, null, null
+ null, null, null, null, null, null, null
);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
index 241c12b2745..94050f25940 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
@@ -59,16 +59,15 @@ public class UserCacheObjectImpl extends CacheObjectImpl {
IgniteCacheObjectProcessor proc =
ctx.kernalContext().cacheObjects();
if (valBytes == null)
- valBytes = proc.marshal(ctx, val);
+ valBytes = valueBytesFromValue(ctx);
if (ctx.storeValue()) {
boolean p2pEnabled =
ctx.kernalContext().config().isPeerClassLoadingEnabled();
ClassLoader ldr = p2pEnabled ?
- IgniteUtils.detectClass(this.val).getClassLoader() :
val.getClass().getClassLoader();
+ IgniteUtils.detectClass(val).getClassLoader() :
val.getClass().getClassLoader();
- Object val = this.val != null && proc.immutable(this.val) ?
this.val :
- proc.unmarshal(ctx, valBytes, ldr);
+ Object val = this.val != null && proc.immutable(this.val) ?
this.val : valueFromValueBytes(ctx, ldr);
return new CacheObjectImpl(val, valBytes);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
index cf7b890f5d6..f03a64e8187 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -184,6 +184,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends
GridCommonAbstractTest {
null,
null,
null,
+ null,
null)
).createSerializer(serVer);
@@ -480,6 +481,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends
GridCommonAbstractTest {
null,
null,
null,
+ null,
null
);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 290e8f9e56f..93a5ea9bd09 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -99,7 +99,8 @@ public class BPlusTreePageMemoryImplTest extends
BPlusTreeSelfTest {
null,
null,
null,
- new CacheDiagnosticManager()
+ new CacheDiagnosticManager(),
+ null
);
IgniteOutClosure<CheckpointProgress> clo = new
IgniteOutClosure<CheckpointProgress>() {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 668941f65a8..c7d5a92696b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -98,6 +98,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends
BPlusTreeReuseSelfTest
null,
null,
null,
+ null,
null
);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index 1504bef1f65..14d1380acd5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -114,7 +114,8 @@ public class IndexStoragePageMemoryImplTest extends
IndexStorageSelfTest {
null,
null,
null,
- new CacheDiagnosticManager()
+ new CacheDiagnosticManager(),
+ null
);
IgniteOutClosure<CheckpointProgress> clo = () ->
Mockito.mock(CheckpointProgressImpl.class);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 6e837ba7d6b..1f2cc68f366 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -103,6 +103,7 @@ public class PageMemoryImplNoLoadTest extends
PageMemoryNoLoadSelfTest {
null,
null,
null,
+ null,
null
);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index caa74caf74c..94b79039cb0 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -633,6 +633,7 @@ public class PageMemoryImplTest extends
GridCommonAbstractTest {
null,
null,
null,
+ null,
null
);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/AbstractCacheObjectTransformationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/AbstractCacheObjectTransformationTest.java
new file mode 100644
index 00000000000..fdfe14b5c69
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/AbstractCacheObjectTransformationTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheObjectTransformedEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static
org.apache.ignite.internal.binary.GridBinaryMarshaller.TRANSFORMED;
+
+/**
+ *
+ */
+public abstract class AbstractCacheObjectTransformationTest extends
GridCommonAbstractTest {
+ /** Cache name. */
+ protected static final String CACHE_NAME = "data";
+
+ /** Nodes count. */
+ protected static final int NODES = 3;
+
+ /** Key. */
+ protected int key;
+
+ /** Event queue. */
+ protected final ConcurrentLinkedDeque<CacheObjectTransformedEvent>
evtQueue = new ConcurrentLinkedDeque<>();
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(cacheConfiguration());
+ cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_TRANSFORMED);
+
+ return cfg;
+ }
+
+ /**
+ * Gets cache configuration.
+ *
+ * @return Data cache configuration.
+ */
+ protected CacheConfiguration<?, ?> cacheConfiguration() {
+ CacheConfiguration<?, ?> cfg = defaultCacheConfiguration();
+
+ cfg.setName(CACHE_NAME);
+ cfg.setBackups(NODES);
+ cfg.setReadFromBackup(true);
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setAffinity(new RendezvousAffinityFunction(false, 1)); //
Simplifies event calculation.
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ protected Ignite prepareCluster() throws Exception {
+ Ignite ignite = startGrids(NODES);
+
+ awaitPartitionMapExchange();
+
+ ignite.events().remoteListen(
+ (uuid, evt) -> {
+ assertTrue(evt instanceof CacheObjectTransformedEvent);
+
+ evtQueue.add((CacheObjectTransformedEvent)evt);
+
+ return true;
+ },
+ null,
+ EventType.EVT_CACHE_OBJECT_TRANSFORMED);
+
+ return ignite;
+ }
+
+ /**
+ *
+ */
+ protected void putAndGet(Object val, boolean transformableVal, boolean
reversed) {
+ boolean binarizableVal = !(val instanceof String || val instanceof
Integer || val instanceof Object[] ||
+ val instanceof int[] || val instanceof Collection);
+
+ Object k = reversed ? val : ++key;
+ Object v = reversed ? ++key : val;
+
+ putWithCheck(k, v, !reversed && binarizableVal, transformableVal);
+ getWithCheck(k, v, transformableVal);
+ }
+
+ /**
+ *
+ */
+ private void putWithCheck(Object key, Object val, boolean binarizableVal,
boolean transformableVal) {
+ Ignite node = backupNode(0, CACHE_NAME); // Any key, because of single
partition.
+
+ IgniteCache<Object, Object> cache = node.getOrCreateCache(CACHE_NAME);
+
+ cache.put(key, val);
+
+ int transformed = transformableVal ? 1 : 0;
+ int transformCancelled = transformableVal ? 0 : 1;
+ int restored = transformableVal && binarizableVal ? NODES : 0; //
Binary array is required (e.g. to wait for proper Metadata)
+
+ checkEvents(transformed, transformCancelled, restored,
transformableVal);
+ }
+
+ /**
+ *
+ */
+ private void getWithCheck(Object key, Object expVal, boolean
transformableVal) {
+ for (Ignite node : G.allGrids()) {
+ for (boolean keepBinary : new boolean[] {true, false})
+ getWithCheck(node, key, expVal, transformableVal, keepBinary);
+ }
+ }
+
+ /**
+ *
+ */
+ private void getWithCheck(Ignite node, Object key, Object expVal, boolean
transformableVal, boolean keepBinary) {
+ IgniteCache<Object, Object> cache = node.getOrCreateCache(CACHE_NAME);
+
+ if (keepBinary)
+ cache = cache.withKeepBinary();
+
+ Object obj = cache.get(key);
+
+ CacheObjectContext coCtx = ((IgniteCacheProxy<?,
?>)cache).context().cacheObjectContext();
+
+ expVal = coCtx.unwrapBinaryIfNeeded(expVal, false, true, null);
+ obj = coCtx.unwrapBinaryIfNeeded(obj, false, true, null);
+
+ assertEqualsArraysAware(expVal, obj);
+
+ int restored = transformableVal ? 1 : 0; // Value restored.
+
+ checkEvents(0, 0, restored, transformableVal);
+ }
+
+ /**
+ *
+ */
+ private void checkEvents(int transformed, int transformCancelled, int
restored, boolean transformableVal) {
+ for (int i = transformed + transformCancelled + restored; i > 0; i--) {
+ CacheObjectTransformedEvent evt = event();
+
+ if (evt.isRestore())
+ restored--;
+ else {
+ boolean arrEqual = Arrays.equals(evt.getOriginal(),
evt.getTransformed());
+
+ assertEquals(transformableVal, !arrEqual);
+
+ if (!arrEqual)
+ transformed--;
+ else
+ transformCancelled--;
+ }
+ }
+
+ assertEquals(0, transformed);
+ assertEquals(0, transformCancelled);
+ assertEquals(0, restored);
+
+ checkEventsAbsent();
+ }
+
+ /**
+ *
+ */
+ private void checkEventsAbsent() {
+ assertTrue(evtQueue.size() + " unhandled events", evtQueue.isEmpty());
+ }
+
+ /**
+ *
+ */
+ private CacheObjectTransformedEvent event() {
+ CacheObjectTransformedEvent evt = null;
+
+ while (evt == null)
+ evt = evtQueue.poll();
+
+ return evt;
+ }
+
+ /**
+ *
+ */
+ protected static final class BinarizableData {
+ /** String. */
+ String str;
+
+ /** List. */
+ List<Object> list;
+
+ /** Int. */
+ Integer i;
+
+ /** Data. */
+ BinarizableData data;
+
+ /** */
+ public BinarizableData(String str, List<Object> list, Integer i) {
+ this.str = str;
+ this.list = list;
+ this.i = i;
+ }
+
+ /** */
+ public BinarizableData(String str, List<Object> list, Integer i,
BinarizableData data) {
+ this.str = str;
+ this.list = list;
+ this.i = i;
+ this.data = data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ BinarizableData data = (BinarizableData)o;
+
+ return Objects.equals(str, data.str) && Objects.equals(list,
data.list) && Objects.equals(i, data.i);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(str, list, i);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static final class ControllableCacheObjectTransformer extends
TestCacheObjectTransformerManagerAdapter {
+ /** Shift. */
+ private static volatile int shift;
+
+ /** Transformation counter. */
+ public static final Map<Integer, AtomicInteger> tCntr = new
ConcurrentHashMap<>();
+
+ /** Restoration counter. */
+ public static final Map<Integer, AtomicInteger> rCntr = new
ConcurrentHashMap<>();
+
+ /**
+ *
+ */
+ public static void transformationShift(int shift) {
+ ControllableCacheObjectTransformer.shift = shift;
+ }
+
+ /**
+ *
+ */
+ public static boolean failOnTransformation() {
+ return shift == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer transform(ByteBuffer original) {
+ if (failOnTransformation())
+ return null;
+
+ tCntr.computeIfAbsent(shift, key -> new
AtomicInteger()).incrementAndGet();
+
+ ByteBuffer transformed = ByteBuffer.wrap(new
byte[original.remaining() + 5]);
+
+ transformed.put(TRANSFORMED);
+ transformed.putInt(shift);
+
+ while (original.hasRemaining())
+ transformed.put((byte)(original.get() + shift));
+
+ transformed.flip();
+
+ return transformed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer restore(ByteBuffer transformed) {
+ ByteBuffer restored = ByteBuffer.wrap(new
byte[transformed.remaining() - 4]);
+
+ int origShift = transformed.getInt();
+
+ rCntr.computeIfAbsent(origShift, key -> new
AtomicInteger()).incrementAndGet();
+
+ while (transformed.hasRemaining())
+ restored.put((byte)(transformed.get() - origShift));
+
+ restored.flip();
+
+ return restored;
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectTransformationEvolutionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectTransformationEvolutionTest.java
new file mode 100644
index 00000000000..5fd6bcade75
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectTransformationEvolutionTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Checks transformation algorithm change.
+ */
+@RunWith(Parameterized.class)
+public class CacheObjectTransformationEvolutionTest extends
AbstractCacheObjectTransformationTest {
+ /** Atomicity mode. */
+ @Parameterized.Parameter
+ public CacheAtomicityMode mode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "mode={0}")
+ public static Collection<?> parameters() {
+ List<Object[]> res = new ArrayList<>();
+
+ for (CacheAtomicityMode mode : new CacheAtomicityMode[]
{CacheAtomicityMode.TRANSACTIONAL, CacheAtomicityMode.ATOMIC})
+ res.add(new Object[] {mode});
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<?, ?> cacheConfiguration() {
+ CacheConfiguration<?, ?> cfg = super.cacheConfiguration();
+
+ cfg.setAtomicityMode(mode);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setPluginProviders(
+ new TestCacheObjectTransformerPluginProvider(new
ControllableCacheObjectTransformer()));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ ControllableCacheObjectTransformer.tCntr.clear();
+ ControllableCacheObjectTransformer.rCntr.clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testInt() throws Exception {
+ doTest(i -> i);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testString() throws Exception {
+ doTest(String::valueOf);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testObject() throws Exception {
+ doTest(i -> new BinarizableData(String.valueOf(i), null, i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryObject() throws Exception {
+ doTest(i -> {
+ BinaryObjectBuilder builder =
grid(0).binary().builder(BinarizableData.class.getName());
+
+ builder.setField("str", String.valueOf(i));
+ builder.setField("i", i);
+
+ return builder.build();
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest(Function<Integer, Object> kvGen) throws Exception {
+ prepareCluster();
+
+ IgniteCache<Object, Object> cache =
+ primaryNode(0/*any*/,
CACHE_NAME).getOrCreateCache(AbstractCacheObjectTransformationTest.CACHE_NAME);
+
+ int cnt = 100;
+
+ int totalCnt = 0;
+ int transformCnt = 0;
+ int restoreCnt = 0;
+
+ int[] shifts = new int[] {-3, 0/*disabled*/, 7, 42};
+
+ ThreadLocalRandom rdm = ThreadLocalRandom.current();
+
+ Object obj = kvGen.apply(0);
+
+ boolean binarizable = obj instanceof BinarizableData;
+ boolean binary = obj instanceof BinaryObject;
+
+ // Regular put, brandnew value.
+ for (int i = 0; i < cnt; i++) {
+ for (int shift : shifts) {
+ ControllableCacheObjectTransformer.transformationShift(shift);
+
+ Object kv = kvGen.apply(++key);
+
+ cache.put(kv, kv);
+ }
+ }
+
+ transformCnt += cnt; // Put on primary.
+
+ if (binarizable || binary) // Binary array is required at backups at
put (e.g. to wait for proper Metadata)
+ restoreCnt += (NODES - 1) * cnt; // Put on backups.
+
+ totalCnt += cnt;
+
+ // Already used value.
+ for (int i = 0; i < cnt; i++) {
+ for (int shift : shifts) {
+ ControllableCacheObjectTransformer.transformationShift(-shift);
+
+ Object kv = kvGen.apply(++key);
+
+ cache.put(-key, kv); // Initial put causes value
transformation.
+
+ ControllableCacheObjectTransformer.transformationShift(shift);
+
+ cache.put(kv, kv); // Using already transformed value as a key
and as a value.
+ }
+ }
+
+ if (!binary) { // Binary value already marshalled using the previous
shift.
+ transformCnt += cnt; // Put on primary.
+ totalCnt += cnt;
+ }
+
+ if (binarizable)
+ // Will be transformed (and restored!) using the actual shift,
while BinaryObject will keep the previous transformation result.
+ restoreCnt += (NODES - 1) * cnt; // Put on backups.
+
+ // Value got from the cache.
+ for (int i = 0; i < cnt; i++) {
+ for (int shift : shifts) {
+ ControllableCacheObjectTransformer.transformationShift(-shift);
+
+ Object kv = kvGen.apply(++key);
+
+ cache.put(-key, kv);
+
+ Object kv0 = cache.withKeepBinary().get(-key);
+
+ ControllableCacheObjectTransformer.transformationShift(shift);
+
+ cache.put(kv0, kv0); // Using the value which was obtained
from the cache as a key and as a value.
+ }
+ }
+
+ if (!binary && !binarizable) { // Binary and binarizable (objects)
value already marshalled using the previous shift.
+ transformCnt += cnt; // Put on primary.
+ totalCnt += cnt;
+ }
+
+ // Value replace.
+ for (int i = 0; i < cnt; i++) {
+ for (int shift : shifts) {
+ ControllableCacheObjectTransformer.transformationShift(-shift);
+
+ Object kv = kvGen.apply(++key);
+
+ cache.put(kv, kv);
+
+ ControllableCacheObjectTransformer.transformationShift(shift);
+
+ Object kv2 = kvGen.apply(key); // Brandnew kv to guarantee
transtormation.
+
+ assertEquals(kv, kv2);
+
+ assertTrue(cache.replace(kv2, kv2, -42)); // Replacing kv via
kv2 (same object, but different bytes).
+
+ assertTrue(cache.replace(kv, kvGen.apply(key))); // Replacing
-42 with generated value.
+ }
+ }
+
+ if (mode == CacheAtomicityMode.TRANSACTIONAL) // Values at replace
required to be marshalled
+ transformCnt += cnt * 3; // [kv2 (as a value), -42, generated].
+ else
+ transformCnt += cnt * 2; // [-42, generated]. Atomic operation
compares with previous values without transformaton.
+
+ if (binarizable || binary) { // Binary array is required at backups at
put (e.g. to wait for proper Metadata)
+ if (mode == CacheAtomicityMode.TRANSACTIONAL)
+ restoreCnt += (NODES - 1) * cnt * 2; // Double replace on
backups (restoration of both transfered binary objects).
+ else
+ restoreCnt += (NODES - 1) * cnt; // Previous value will not be
transfered to backups (only generated).
+ }
+
+ totalCnt += cnt;
+
+ // Value replace via Entry Processor.
+ for (int i = 0; i < cnt; i++) {
+ for (int shift : shifts) {
+ ControllableCacheObjectTransformer.transformationShift(-shift);
+
+ Object kv = kvGen.apply(++key);
+
+ cache.put(kv, kv);
+
+ ControllableCacheObjectTransformer.transformationShift(shift);
+
+ Object kv2 = kvGen.apply(key); // Brandnew kv to guarantee
transtormation.
+
+ assertEquals(kv, kv2);
+
+ assertEquals(Integer.valueOf(-42),
+ cache.invoke(kv2, (e, args) -> {
+ Object val = e.getValue();
+ Object exp = kv2;
+
+ if (binary)
+ exp = ((BinaryObject)exp).deserialize();
+
+ if (Objects.equals(exp, val))
+ e.setValue(-42); // Replacing kv via kv2 (same
object, but different bytes).
+
+ return e.getValue();
+ }));
+
+ Object kv3 = kvGen.apply(key);
+
+ cache.invoke(kv, (e, args) -> {
+ e.setValue(kv3); // Replacing -42 with generated value.
+
+ return null;
+ });
+ }
+ }
+
+ if (mode == CacheAtomicityMode.TRANSACTIONAL)
+ transformCnt += NODES * cnt * 2; // [-42, generated], each node.
+ else
+ transformCnt += cnt * 2; // [-42, generated], primary.
+
+ if (binarizable || binary) { // Binary array is required at backups at
put (e.g. to wait for proper Metadata)
+ if (mode == CacheAtomicityMode.ATOMIC)
+ restoreCnt += (NODES - 1) * cnt; // kv3 will be restored as a
part of the update message at backups.
+ }
+
+ totalCnt += cnt;
+
+ // Checking.
+ for (int shift : shifts) {
+ if (shift != 0)
+ assertEquals(transformCnt,
ControllableCacheObjectTransformer.tCntr.get(shift).get());
+ else
+
assertNull(ControllableCacheObjectTransformer.tCntr.get(shift));
+
+ if (shift != 0 && restoreCnt > 0)
+ assertEquals(restoreCnt,
ControllableCacheObjectTransformer.rCntr.get(shift).get());
+ else
+
assertNull(ControllableCacheObjectTransformer.rCntr.get(shift));
+ }
+
+ ControllableCacheObjectTransformer.tCntr.clear();
+ ControllableCacheObjectTransformer.rCntr.clear();
+
+ if (binary)
+ cache = cache.withKeepBinary();
+
+ // Get (hits/misses).
+ while (key > 0) {
+
ControllableCacheObjectTransformer.transformationShift(rdm.nextInt()); //
Random transformation.
+
+ Object kv = kvGen.apply(key--);
+ Object val = cache.get(kv);
+
+ assertEquals(kv, val);
+
+ if (binary) {
+ Object dKv = ((BinaryObject)kv).deserialize();
+ Object dVal = ((BinaryObject)val).deserialize();
+
+ assertEquals(dKv, dVal);
+ }
+ }
+
+ // Checking.
+ for (int shift : shifts) {
+ assertNull(ControllableCacheObjectTransformer.tCntr.get(shift));
// No key transformations at used shifts.
+ assertEquals(0, ControllableCacheObjectTransformer.tCntr.size());
// No key transformations at all.
+
+ if (shift != 0)
+ assertEquals(totalCnt,
ControllableCacheObjectTransformer.rCntr.get(shift).get());
+ else
+
assertNull(ControllableCacheObjectTransformer.rCntr.get(shift));
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectTransformationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectTransformationTest.java
new file mode 100644
index 00000000000..04cb3635aed
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectTransformationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class CacheObjectTransformationTest extends
AbstractCacheObjectTransformationTest {
+ /** Atomicity mode. */
+ @Parameterized.Parameter
+ public CacheAtomicityMode mode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "mode={0}")
+ public static Collection<?> parameters() {
+ List<Object[]> res = new ArrayList<>();
+
+ for (CacheAtomicityMode mode : new CacheAtomicityMode[]
{CacheAtomicityMode.TRANSACTIONAL, CacheAtomicityMode.ATOMIC})
+ res.add(new Object[] {mode});
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setPluginProviders(
+ new TestCacheObjectTransformerPluginProvider(new
ControllableCacheObjectTransformer()));
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<?, ?> cacheConfiguration() {
+ CacheConfiguration<?, ?> cfg = super.cacheConfiguration();
+
+ cfg.setAtomicityMode(mode);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTransformable() throws Exception {
+ ControllableCacheObjectTransformer.transformationShift(42);
+
+ assertFalse(ControllableCacheObjectTransformer.failOnTransformation());
+
+ doTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUntransformable() throws Exception {
+ ControllableCacheObjectTransformer.transformationShift(0); // Fail on
transformation.
+
+ assertTrue(ControllableCacheObjectTransformer.failOnTransformation());
+
+ doTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest() throws Exception {
+ Ignite ignite = prepareCluster();
+
+ int i = -42; // Avoiding intersection with an incremental key.
+ int[] is = new int[] {i, i, i};
+ List<Integer> iList = Lists.newArrayList(i, i, i);
+
+ putAndGet(i);
+ putAndGet(is);
+ putAndGet(iList);
+
+ String str = "test";
+ String[] strs = new String[] {str, str, str};
+ List<String> strList = Lists.newArrayList(str, str, str);
+
+ putAndGet(str);
+ putAndGet(strs);
+ putAndGet(strList);
+
+ BinarizableData data = new BinarizableData(str, Lists.newArrayList(i,
i, i), i);
+ BinarizableData[] datas = new BinarizableData[] {data, data, data};
+ List<BinarizableData> dataList = Lists.newArrayList(data, data, data);
+
+ putAndGet(data);
+ putAndGet(datas);
+ putAndGet(dataList);
+
+ BinarizableData ddata = new BinarizableData(str, Lists.newArrayList(i,
i, i), i, data);
+ BinarizableData[] ddatas = new BinarizableData[] {ddata, ddata, ddata};
+ List<BinarizableData> ddataList = Lists.newArrayList(ddata, ddata,
ddata);
+
+ putAndGet(ddata);
+ putAndGet(ddatas);
+ putAndGet(ddataList);
+
+ BinaryObjectBuilder builder =
ignite.binary().builder(BinarizableData.class.getName());
+
+ builder.setField("str", str + "!");
+ builder.setField("list", Lists.newArrayList(i, i));
+ builder.setField("i", i);
+
+ BinaryObject bo = builder.build();
+ BinaryObject[] bos = new BinaryObject[] {bo, bo, bo};
+ List<BinaryObject> boList = Lists.newArrayList(bo, bo, bo);
+
+ putAndGet(bo);
+ putAndGet(bos);
+ putAndGet(boList);
+
+ builder.setField("data", data);
+
+ BinaryObject dbo = builder.build();
+ BinaryObject[] dbos = new BinaryObject[] {dbo, dbo, dbo};
+ List<BinaryObject> dboList = Lists.newArrayList(dbo, dbo, dbo);
+
+ putAndGet(dbo);
+ putAndGet(dbos);
+ putAndGet(dboList);
+ }
+
+ /**
+ * @param val Value.
+ */
+ private void putAndGet(Object val) {
+ for (boolean reversed : new boolean[] {true, false})
+ putAndGet(val,
!ControllableCacheObjectTransformer.failOnTransformation(), reversed);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/TestCacheObjectTransformerManagerAdapter.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/TestCacheObjectTransformerManagerAdapter.java
new file mode 100644
index 00000000000..25944ef2956
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/TestCacheObjectTransformerManagerAdapter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import org.apache.ignite.IgniteCheckedException;
+import
org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ *
+ */
+public abstract class TestCacheObjectTransformerManagerAdapter implements
CacheObjectTransformerManager {
+ /** {@inheritDoc} */
+ @Override public void start(GridCacheSharedContext cctx) throws
IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart(boolean active) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean active) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ // No-op.
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/TestCacheObjectTransformerPluginProvider.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/TestCacheObjectTransformerPluginProvider.java
new file mode 100644
index 00000000000..0db5d192308
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transform/TestCacheObjectTransformerPluginProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transform;
+
+import
org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class TestCacheObjectTransformerPluginProvider extends
AbstractTestPluginProvider {
+ /** Manager. */
+ private final CacheObjectTransformerManager mgr;
+
+ /**
+ * @param mgr Manager.
+ */
+ public
TestCacheObjectTransformerPluginProvider(CacheObjectTransformerManager mgr) {
+ this.mgr = mgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "TestCacheObjectTransformPlugin";
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T createComponent(PluginContext ctx,
Class<T> cls) {
+ if (CacheObjectTransformerManager.class.equals(cls))
+ return (T)mgr;
+
+ return null;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0bb13e9710d..4bb9cebb92e 100644
---
a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
+
import static
org.apache.ignite.testframework.junits.GridAbstractTest.defaultCacheConfiguration;
/**
@@ -85,7 +86,8 @@ public class GridCacheTestContext<K, V> extends
GridCacheContext<K, V> {
null,
null,
null,
- new CacheDiagnosticManager()
+ new CacheDiagnosticManager(),
+ null
),
defaultCacheConfiguration(),
null,
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
index 7b647bd94e6..6e169d74851 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
@@ -42,6 +42,8 @@ import
org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomi
import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.RebalanceStatisticsTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxAsyncOpsSemaphorePermitsExceededTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRecoveryOnCoordniatorFailTest;
+import
org.apache.ignite.internal.processors.cache.transform.CacheObjectTransformationEvolutionTest;
+import
org.apache.ignite.internal.processors.cache.transform.CacheObjectTransformationTest;
import
org.apache.ignite.internal.processors.cluster.ClusterNameBeforeActivation;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -101,6 +103,9 @@ public class IgniteCacheTestSuite13 {
GridTestUtils.addTestIfNeeded(suite,
TxAsyncOpsSemaphorePermitsExceededTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
CacheObjectTransformationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
CacheObjectTransformationEvolutionTest.class, ignoredTests);
+
return suite;
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryCacheKeyValueTransformedFieldsTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryCacheKeyValueTransformedFieldsTest.java
new file mode 100644
index 00000000000..60da8048b97
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryCacheKeyValueTransformedFieldsTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import
org.apache.ignite.internal.processors.cache.transform.TestCacheObjectTransformerManagerAdapter;
+import
org.apache.ignite.internal.processors.cache.transform.TestCacheObjectTransformerPluginProvider;
+
+import static
org.apache.ignite.internal.binary.GridBinaryMarshaller.TRANSFORMED;
+
+/** Test checks that indexing works (including inlining) with enabled cache
objects transformer. */
+public class IndexQueryCacheKeyValueTransformedFieldsTest extends
IndexQueryCacheKeyValueFieldsTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ return super.getConfiguration(instanceName).setPluginProviders(
+ new TestCacheObjectTransformerPluginProvider(new
RandomShiftCacheObjectTransformer()));
+ }
+
+ /**
+ * Transforms each object with a random shift.
+ */
+ protected static final class RandomShiftCacheObjectTransformer extends
TestCacheObjectTransformerManagerAdapter {
+ /** {@inheritDoc} */
+ @Override public ByteBuffer transform(ByteBuffer original) {
+ ByteBuffer transformed = ByteBuffer.wrap(new
byte[original.remaining() + 5]);
+
+ int shift = ThreadLocalRandom.current().nextInt();
+
+ transformed.put(TRANSFORMED);
+ transformed.putInt(shift);
+
+ while (original.hasRemaining())
+ transformed.put((byte)(original.get() + shift));
+
+ transformed.flip();
+
+ return transformed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer restore(ByteBuffer transformed) {
+ ByteBuffer restored = ByteBuffer.wrap(new
byte[transformed.remaining() - 4]);
+
+ int shift = transformed.getInt();
+
+ while (transformed.hasRemaining())
+ restored.put((byte)(transformed.get() - shift));
+
+ restored.flip();
+
+ return restored;
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index 754fc2d2361..3d408c06d2f 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -38,6 +38,7 @@ import org.junit.runners.Suite;
IndexQueryPartitionTest.class,
IndexQueryCacheKeyValueFieldsTest.class,
IndexQueryCacheKeyValueEscapedFieldsTest.class,
+ IndexQueryCacheKeyValueTransformedFieldsTest.class,
IndexQueryWrongIndexTest.class,
MultifieldIndexQueryTest.class,
MultiTableIndexQuery.class,
diff --git a/parent/pom.xml b/parent/pom.xml
index 7cb536d85e2..1cb6d874467 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -137,7 +137,7 @@
<scala.test.version>2.2.6</scala.test.version>
<slf4j.version>1.7.33</slf4j.version>
<slf4j16.version>1.6.4</slf4j16.version>
- <snappy.version>1.1.7.2</snappy.version>
+ <snappy.version>1.1.8.4</snappy.version>
<spring.version>5.2.22.RELEASE</spring.version>
<storm.version>1.1.1</storm.version>
<surefire.version>3.0.0-M4</surefire.version>
@@ -148,7 +148,7 @@
<yardstick.version>0.8.3</yardstick.version>
<zkclient.version>0.5</zkclient.version>
<zookeeper.version>3.6.3</zookeeper.version>
- <zstd.version>1.3.7-2</zstd.version>
+ <zstd.version>1.5.2-5</zstd.version>
<opencensus.version>0.22.0</opencensus.version>
<commons.lang3.version>3.9</commons.lang3.version>
<ignite-kafka-ext.version>1.0.0</ignite-kafka-ext.version>