ignite-2835: Fixed BinaryObjectOffHeapImpl leakage to public code
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e85a7170 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e85a7170 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e85a7170 Branch: refs/heads/gridgain-7.5.11-vk Commit: e85a7170534cb66f40386cba689cfe632f4e66db Parents: ec2ec99 Author: ashutak <[email protected]> Authored: Tue Apr 5 14:56:01 2016 +0300 Committer: ashutak <[email protected]> Committed: Tue Apr 5 14:56:01 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 7 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../BinaryObjectOffHeapUnswapTemporaryTest.java | 370 +++++++++++++++++++ .../GridCacheBinaryObjectsAbstractSelfTest.java | 181 ++++++--- .../IgniteBinaryObjectsTestSuite.java | 4 +- 5 files changed, 510 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e85a7170/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5519070..83f5d5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -898,9 +899,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (ret != null) + if (ret != null) { + assert tmp || !(ret instanceof BinaryObjectOffheapImpl); + // If return value is consistent, then done. return retVer ? new T2<>(ret, resVer) : ret; + } boolean loadedFromStore = false; @@ -974,6 +978,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } assert ret == null || !retVer; + assert tmp || !(ret instanceof BinaryObjectOffheapImpl); return ret; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e85a7170/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fef91c8..785e937 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1893,7 +1893,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /*unmarshal*/true, /*metrics*/true, /*events*/!skipVals, - /*temporary*/true, + /*temporary*/false, CU.subjectId(IgniteTxLocalAdapter.this, cctx), transformClo, resolveTaskName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e85a7170/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java new file mode 100644 index 0000000..4272a14 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; + +/** + * + */ +public class BinaryObjectOffHeapUnswapTemporaryTest extends GridCommonAbstractTest { + /** */ + private static final int CNT = 20; + + /** */ + @SuppressWarnings("serial") + private static final CacheEntryProcessor PROC = new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { + return entry.getValue(); + } + }; + + /** */ + private CacheAtomicityMode atomicityMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshaller(new BinaryMarshaller()); + + return c; + } + + /** + * @param atomicityMode Atomicity mode. + * @param memoryMode Memory mode. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + this.atomicityMode = atomicityMode; + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setAtomicityMode(atomicityMode); + cfg.setMemoryMode(memoryMode); + cfg.setBackups(1); + cfg.setSwapEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTiered() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValues() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(TRANSACTIONAL, OFFHEAP_VALUES)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTiered() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(ATOMIC, OFFHEAP_TIERED)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValues() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(ATOMIC, OFFHEAP_VALUES)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * + */ + private void doTest() { + final IgniteCache<Integer, BinaryObject> cache = jcache(0).withKeepBinary(); + + for (int key = 0; key < CNT; key++) + jcache(0).put(key, new TestObject(key)); + + for (int key = CNT; key < 2 * CNT; key++) { + BinaryObjectBuilder builder = ignite(0).binary().builder("SomeType"); + builder.setField("field1", key); + builder.setField("field2", "name_" + key); + + cache.put(key, builder.build()); + } + + Set<Integer> keys = new LinkedHashSet<>(); + + for (int i = 0; i < 2 * CNT; i++) + keys.add(i); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.get(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.getEntry(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.getAndPut(key, cache.get(key)) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.getAndReplace(key, cache.get(key)) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.getAndPutIfAbsent(key, cache.get(key)) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.localPeek(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.getAndRemove(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new TestCheck<Integer>() { + @Override public void check(Integer key) { + assertFalse(cache.invoke(key, PROC) instanceof BinaryObjectOffheapImpl); + } + }); + + // GetAll. + Map<Integer, BinaryObject> res = cache.getAll(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.getAll(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + + // GetAllOutTx. + res = cache.getAllOutTx(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.getAllOutTx(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + + // InvokeAll. + res = cache.invokeAll(keys, PROC); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.invokeAll(keys, PROC); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + + // GetEntries. + Collection<CacheEntry<Integer, BinaryObject>> entries = cache.getEntries(keys); + + for (CacheEntry<Integer, BinaryObject> e : entries) + assertFalse(e.getValue() instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + entries = cache.getEntries(keys); + + for (CacheEntry<Integer, BinaryObject> e : entries) + assertFalse(e.getValue() instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + } + + /** + * + */ + private void check(TestCheck<Integer> checkOp) { + for (int key = 0; key < 2 * CNT; key++) { + checkOp.check(key); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + checkOp.check(key); + + tx.commit(); + } + } + } + } + } + } + + /** + * + */ + @SuppressWarnings("PackageVisibleField") + private static class TestObject { + /** */ + String field; + + /** */ + int field2; + + /** + * @param key Key. + */ + TestObject(int key) { + field = "str" + key; + field2 = key; + } + } + + /** + * + */ + private static interface TestCheck<T> { + /** + * @param t Parameter. + */ + public void check(T t); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e85a7170/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java index 36a9450..f776146 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java @@ -29,9 +29,15 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteBinary; +import org.apache.ignite.IgniteCache; import org.apache.ignite.binary.BinaryNameMapper; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -41,7 +47,9 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; @@ -49,13 +57,6 @@ import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.binary.BinaryObjectBuilder; -import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.binary.Binarylizable; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.binary.BinaryReader; -import org.apache.ignite.binary.BinaryWriter; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -421,17 +422,33 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testGetTx() throws Exception { + public void testGetTx1() throws Exception { + checkGetTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetTx2() throws Exception { + checkGetTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { if (atomicityMode() != TRANSACTIONAL) return; IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, BinaryObject> kbCache = keepBinaryCache(); for (int i = 0; i < ENTRY_CNT; i++) c.put(i, new TestObject(i)); for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { TestObject obj = c.get(i); assertEquals(i, obj.val); @@ -441,32 +458,75 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA } for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { - TestObject obj = c.get(i); + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + BinaryObject val = kbCache.get(i); - assertEquals(i, obj.val); + assertFalse("Key=" + i, val instanceof BinaryObjectOffheapImpl); + + assertEquals(i, (int)val.field("val")); + + kbCache.put(i, val); tx.commit(); } } + } - IgniteCache<Integer, BinaryObject> kpc = keepBinaryCache(); + /** + * @throws Exception If failed. + */ + public void testGetTxAsync1() throws Exception { + checkGetAsyncTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetTxAsync2() throws Exception { + checkGetAsyncTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetAsyncTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { + if (atomicityMode() != TRANSACTIONAL) + return; + + IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, TestObject> cAsync = c.withAsync(); + IgniteCache<Integer, BinaryObject> kbCache = keepBinaryCache(); + IgniteCache<Integer, BinaryObject> kbCacheAsync = kbCache.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - BinaryObject po = kpc.get(i); + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + cAsync.get(i); - assertEquals(i, (int)po.field("val")); + TestObject obj = (TestObject)cAsync.future().get(); + + assertEquals(i, obj.val); tx.commit(); } } for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { - BinaryObject po = kpc.get(i); + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + kbCacheAsync.get(i); - assertEquals(i, (int)po.field("val")); + BinaryObject val = (BinaryObject)kbCacheAsync.future().get(); + + assertFalse("Key=" + i, val instanceof BinaryObjectOffheapImpl); + + assertEquals(i, (int)val.field("val")); + + kbCacheAsync.put(i, val); + + kbCacheAsync.future().get(); tx.commit(); } @@ -606,11 +666,27 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testGetAllTx() throws Exception { + public void testGetAllTx1() throws Exception { + checkGetAllTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetAllTx2() throws Exception { + checkGetAllTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetAllTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { if (atomicityMode() != TRANSACTIONAL) return; IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, BinaryObject> kpc = keepBinaryCache(); for (int i = 0; i < ENTRY_CNT; i++) c.put(i, new TestObject(i)); @@ -621,18 +697,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA for (int j = 0; j < 10; j++) keys.add(i++); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<Integer, TestObject> objs = c.getAll(keys); - - assertEquals(10, objs.size()); - - for (Map.Entry<Integer, TestObject> e : objs.entrySet()) - assertEquals(e.getKey().intValue(), e.getValue().val); - - tx.commit(); - } - - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { Map<Integer, TestObject> objs = c.getAll(keys); assertEquals(10, objs.size()); @@ -644,32 +709,26 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA } } - IgniteCache<Integer, BinaryObject> kpc = keepBinaryCache(); - for (int i = 0; i < ENTRY_CNT; ) { Set<Integer> keys = new HashSet<>(); for (int j = 0; j < 10; j++) keys.add(i++); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { Map<Integer, BinaryObject> objs = kpc.getAll(keys); assertEquals(10, objs.size()); - for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) - assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); - - tx.commit(); - } + for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) { + BinaryObject val = e.getValue(); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { - Map<Integer, BinaryObject> objs = kpc.getAll(keys); + assertEquals(new Integer(e.getKey().intValue()), val.field("val")); - assertEquals(10, objs.size()); + kpc.put(e.getKey(), val); - for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) - assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + assertFalse("Key=" + i, val instanceof BinaryObjectOffheapImpl); + } tx.commit(); } @@ -679,7 +738,22 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testGetAllAsyncTx() throws Exception { + public void testGetAllAsyncTx1() throws Exception { + checkGetAllAsyncTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetAllAsyncTx2() throws Exception { + checkGetAllAsyncTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetAllAsyncTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { if (atomicityMode() != TRANSACTIONAL) return; @@ -695,7 +769,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA for (int j = 0; j < 10; j++) keys.add(i++); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { cacheAsync.getAll(keys); Map<Integer, TestObject> objs = cacheAsync.<Map<Integer, TestObject>>future().get(); @@ -719,15 +793,20 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA IgniteCache<Integer, BinaryObject> asyncCache = cache.withAsync(); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { asyncCache.getAll(keys); Map<Integer, BinaryObject> objs = asyncCache.<Map<Integer, BinaryObject>>future().get(); assertEquals(10, objs.size()); - for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) - assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) { + BinaryObject val = e.getValue(); + + assertEquals(new Integer(e.getKey().intValue()), val.field("val")); + + assertFalse("Key=" + e.getKey(), val instanceof BinaryObjectOffheapImpl); + } tx.commit(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e85a7170/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java index 5eb7b66..2b49597 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.binary.noncompact.BinaryMarshallerNonCompactSe import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderAdditionalNonCompactSelfTest; import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderNonCompactDefaultMappersSelfTest; import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderNonCompactSimpleNameLowerCaseMappersSelfTest; +import org.apache.ignite.internal.processors.cache.BinaryObjectOffHeapUnswapTemporaryTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesDefaultMappersSelfTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesSimpleNameMappersSelfTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreObjectsSelfTest; @@ -77,7 +78,7 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite { TestSuite suite = new TestSuite("Ignite Binary Objects Test Suite"); suite.addTestSuite(BinarySimpleNameTestPropertySelfTest.class); - + suite.addTestSuite(BinaryBasicIdMapperSelfTest.class); suite.addTestSuite(BinaryBasicNameMapperSelfTest.class); @@ -114,6 +115,7 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite { suite.addTestSuite(GridCacheBinaryObjectsPartitionedNearDisabledSelfTest.class); suite.addTestSuite(GridCacheBinaryObjectsAtomicSelfTest.class); suite.addTestSuite(GridCacheBinaryObjectsAtomicNearDisabledSelfTest.class); + suite.addTestSuite(BinaryObjectOffHeapUnswapTemporaryTest.class); suite.addTestSuite(GridCacheBinaryObjectsLocalOffheapTieredSelfTest.class); suite.addTestSuite(GridCacheBinaryObjectsAtomicOffheapTieredSelfTest.class);
