ignite-2835: Fixed BinaryObjectOffHeapImpl leakage to public code (cherry picked from e85a71)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f970c11b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f970c11b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f970c11b Branch: refs/heads/ignite-1786 Commit: f970c11b3be14d3888e4843aca0d8b05ccac9986 Parents: 196954f Author: ashutak <[email protected]> Authored: Thu Apr 7 13:49:34 2016 +0300 Committer: ashutak <[email protected]> Committed: Thu Apr 7 13:49:34 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 7 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../BinaryObjectOffHeapUnswapTemporaryTest.java | 362 +++++++++++++++++++ .../GridCacheBinaryObjectsAbstractSelfTest.java | 181 +++++++--- .../IgniteBinaryObjectsTestSuite.java | 6 +- 5 files changed, 503 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f970c11b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index cc1a8d3..a448307 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -904,9 +905,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (ret != null) + if (ret != null) { + assert tmp || !(ret instanceof BinaryObjectOffheapImpl); + // If return value is consistent, then done. return retVer ? new T2<>(ret, resVer) : ret; + } boolean loadedFromStore = false; @@ -980,6 +984,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } assert ret == null || !retVer; + assert tmp || !(ret instanceof BinaryObjectOffheapImpl); return ret; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f970c11b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b1e150b..ef69ad9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1889,7 +1889,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /*unmarshal*/true, /*metrics*/true, /*events*/!skipVals, - /*temporary*/true, + /*temporary*/false, CU.subjectId(IgniteTxLocalAdapter.this, cctx), transformClo, resolveTaskName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/f970c11b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java new file mode 100644 index 0000000..f26191f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryObjectOffHeapUnswapTemporaryTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; + +/** + * + */ +public class BinaryObjectOffHeapUnswapTemporaryTest extends GridCommonAbstractTest { + /** */ + private static final int CNT = 20; + + /** */ + @SuppressWarnings("serial") + private static final CacheEntryProcessor PROC = new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { + return entry.getValue(); + } + }; + + /** */ + private CacheAtomicityMode atomicityMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshaller(new BinaryMarshaller()); + + return c; + } + + /** + * @param atomicityMode Atomicity mode. + * @param memoryMode Memory mode. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + this.atomicityMode = atomicityMode; + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setAtomicityMode(atomicityMode); + cfg.setMemoryMode(memoryMode); + cfg.setBackups(1); + cfg.setSwapEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTiered() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValues() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(TRANSACTIONAL, OFFHEAP_VALUES)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTiered() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(ATOMIC, OFFHEAP_TIERED)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValues() throws Exception { + ignite(0).getOrCreateCache(cacheConfiguration(ATOMIC, OFFHEAP_VALUES)); + + try { + doTest(); + } + finally { + ignite(0).destroyCache(null); + } + } + + /** + * + */ + @SuppressWarnings("serial") + private void doTest() { + final IgniteCache<Integer, BinaryObject> cache = jcache(0).withKeepBinary(); + + for (int key = 0; key < CNT; key++) + jcache(0).put(key, new TestObject(key)); + + for (int key = CNT; key < 2 * CNT; key++) { + BinaryObjectBuilder builder = ignite(0).binary().builder("SomeType"); + builder.setField("field1", key); + builder.setField("field2", "name_" + key); + + cache.put(key, builder.build()); + } + + Set<Integer> keys = new LinkedHashSet<>(); + + for (int i = 0; i < 2 * CNT; i++) + keys.add(i); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.get(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.getEntry(key).getValue() instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.getAndPut(key, cache.get(key)) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.getAndReplace(key, cache.get(key)) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.getAndPutIfAbsent(key, cache.get(key)) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.localPeek(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.getAndRemove(key) instanceof BinaryObjectOffheapImpl); + } + }); + + check(new IgniteInClosure<Integer>() { + @Override public void apply(Integer key) { + assertFalse(cache.invoke(key, PROC) instanceof BinaryObjectOffheapImpl); + } + }); + + // GetAll. + Map<Integer, BinaryObject> res = cache.getAll(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.getAll(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + + // GetAllOutTx. + res = cache.getAllOutTx(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.getAllOutTx(keys); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + + // InvokeAll. + res = cache.invokeAll(keys, PROC); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + res = cache.invokeAll(keys, PROC); + + for (BinaryObject val : res.values()) + assertFalse(val instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + + // GetEntries. + Collection<CacheEntry<Integer, BinaryObject>> entries = cache.getEntries(keys); + + for (CacheEntry<Integer, BinaryObject> e : entries) + assertFalse(e.getValue() instanceof BinaryObjectOffheapImpl); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + entries = cache.getEntries(keys); + + for (CacheEntry<Integer, BinaryObject> e : entries) + assertFalse(e.getValue() instanceof BinaryObjectOffheapImpl); + + tx.commit(); + } + } + } + } + } + + /** + * + */ + private void check(IgniteInClosure<Integer> checkOp) { + for (int key = 0; key < 2 * CNT; key++) { + checkOp.apply(key); + + if (atomicityMode == TRANSACTIONAL) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) { + checkOp.apply(key); + + tx.commit(); + } + } + } + } + } + } + + /** + * + */ + @SuppressWarnings("PackageVisibleField") + private static class TestObject { + /** */ + String field; + + /** */ + int field2; + + /** + * @param key Key. + */ + TestObject(int key) { + field = "str" + key; + field2 = key; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f970c11b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java index 36a9450..f776146 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java @@ -29,9 +29,15 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteBinary; +import org.apache.ignite.IgniteCache; import org.apache.ignite.binary.BinaryNameMapper; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -41,7 +47,9 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; @@ -49,13 +57,6 @@ import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.binary.BinaryObjectBuilder; -import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.binary.Binarylizable; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.binary.BinaryReader; -import org.apache.ignite.binary.BinaryWriter; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -421,17 +422,33 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testGetTx() throws Exception { + public void testGetTx1() throws Exception { + checkGetTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetTx2() throws Exception { + checkGetTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { if (atomicityMode() != TRANSACTIONAL) return; IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, BinaryObject> kbCache = keepBinaryCache(); for (int i = 0; i < ENTRY_CNT; i++) c.put(i, new TestObject(i)); for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { TestObject obj = c.get(i); assertEquals(i, obj.val); @@ -441,32 +458,75 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA } for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { - TestObject obj = c.get(i); + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + BinaryObject val = kbCache.get(i); - assertEquals(i, obj.val); + assertFalse("Key=" + i, val instanceof BinaryObjectOffheapImpl); + + assertEquals(i, (int)val.field("val")); + + kbCache.put(i, val); tx.commit(); } } + } - IgniteCache<Integer, BinaryObject> kpc = keepBinaryCache(); + /** + * @throws Exception If failed. + */ + public void testGetTxAsync1() throws Exception { + checkGetAsyncTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetTxAsync2() throws Exception { + checkGetAsyncTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetAsyncTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { + if (atomicityMode() != TRANSACTIONAL) + return; + + IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, TestObject> cAsync = c.withAsync(); + IgniteCache<Integer, BinaryObject> kbCache = keepBinaryCache(); + IgniteCache<Integer, BinaryObject> kbCacheAsync = kbCache.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - BinaryObject po = kpc.get(i); + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + cAsync.get(i); - assertEquals(i, (int)po.field("val")); + TestObject obj = (TestObject)cAsync.future().get(); + + assertEquals(i, obj.val); tx.commit(); } } for (int i = 0; i < ENTRY_CNT; i++) { - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { - BinaryObject po = kpc.get(i); + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + kbCacheAsync.get(i); - assertEquals(i, (int)po.field("val")); + BinaryObject val = (BinaryObject)kbCacheAsync.future().get(); + + assertFalse("Key=" + i, val instanceof BinaryObjectOffheapImpl); + + assertEquals(i, (int)val.field("val")); + + kbCacheAsync.put(i, val); + + kbCacheAsync.future().get(); tx.commit(); } @@ -606,11 +666,27 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testGetAllTx() throws Exception { + public void testGetAllTx1() throws Exception { + checkGetAllTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetAllTx2() throws Exception { + checkGetAllTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetAllTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { if (atomicityMode() != TRANSACTIONAL) return; IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, BinaryObject> kpc = keepBinaryCache(); for (int i = 0; i < ENTRY_CNT; i++) c.put(i, new TestObject(i)); @@ -621,18 +697,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA for (int j = 0; j < 10; j++) keys.add(i++); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<Integer, TestObject> objs = c.getAll(keys); - - assertEquals(10, objs.size()); - - for (Map.Entry<Integer, TestObject> e : objs.entrySet()) - assertEquals(e.getKey().intValue(), e.getValue().val); - - tx.commit(); - } - - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { Map<Integer, TestObject> objs = c.getAll(keys); assertEquals(10, objs.size()); @@ -644,32 +709,26 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA } } - IgniteCache<Integer, BinaryObject> kpc = keepBinaryCache(); - for (int i = 0; i < ENTRY_CNT; ) { Set<Integer> keys = new HashSet<>(); for (int j = 0; j < 10; j++) keys.add(i++); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { Map<Integer, BinaryObject> objs = kpc.getAll(keys); assertEquals(10, objs.size()); - for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) - assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); - - tx.commit(); - } + for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) { + BinaryObject val = e.getValue(); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { - Map<Integer, BinaryObject> objs = kpc.getAll(keys); + assertEquals(new Integer(e.getKey().intValue()), val.field("val")); - assertEquals(10, objs.size()); + kpc.put(e.getKey(), val); - for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) - assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + assertFalse("Key=" + i, val instanceof BinaryObjectOffheapImpl); + } tx.commit(); } @@ -679,7 +738,22 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testGetAllAsyncTx() throws Exception { + public void testGetAllAsyncTx1() throws Exception { + checkGetAllAsyncTx(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetAllAsyncTx2() throws Exception { + checkGetAllAsyncTx(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + */ + private void checkGetAllAsyncTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { if (atomicityMode() != TRANSACTIONAL) return; @@ -695,7 +769,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA for (int j = 0; j < 10; j++) keys.add(i++); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { cacheAsync.getAll(keys); Map<Integer, TestObject> objs = cacheAsync.<Map<Integer, TestObject>>future().get(); @@ -719,15 +793,20 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA IgniteCache<Integer, BinaryObject> asyncCache = cache.withAsync(); - try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { asyncCache.getAll(keys); Map<Integer, BinaryObject> objs = asyncCache.<Map<Integer, BinaryObject>>future().get(); assertEquals(10, objs.size()); - for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) - assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + for (Map.Entry<Integer, BinaryObject> e : objs.entrySet()) { + BinaryObject val = e.getValue(); + + assertEquals(new Integer(e.getKey().intValue()), val.field("val")); + + assertFalse("Key=" + e.getKey(), val instanceof BinaryObjectOffheapImpl); + } tx.commit(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f970c11b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java index 043ea06..cedf9a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java @@ -44,11 +44,12 @@ import org.apache.ignite.internal.binary.noncompact.BinaryMarshallerNonCompactSe import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderAdditionalNonCompactSelfTest; import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderNonCompactDefaultMappersSelfTest; import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderNonCompactSimpleNameLowerCaseMappersSelfTest; +import org.apache.ignite.internal.binary.streams.BinaryHeapStreamByteOrderSelfTest; +import org.apache.ignite.internal.binary.streams.BinaryOffheapStreamByteOrderSelfTest; +import org.apache.ignite.internal.processors.cache.BinaryObjectOffHeapUnswapTemporaryTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryObjectUserClassloaderSelfTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesDefaultMappersSelfTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesSimpleNameMappersSelfTest; -import org.apache.ignite.internal.binary.streams.BinaryHeapStreamByteOrderSelfTest; -import org.apache.ignite.internal.binary.streams.BinaryOffheapStreamByteOrderSelfTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreObjectsSelfTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheClientNodeBinaryObjectMetadataMultinodeTest; import org.apache.ignite.internal.processors.cache.binary.GridCacheClientNodeBinaryObjectMetadataTest; @@ -117,6 +118,7 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite { suite.addTestSuite(GridCacheBinaryObjectsPartitionedNearDisabledSelfTest.class); suite.addTestSuite(GridCacheBinaryObjectsAtomicSelfTest.class); suite.addTestSuite(GridCacheBinaryObjectsAtomicNearDisabledSelfTest.class); + suite.addTestSuite(BinaryObjectOffHeapUnswapTemporaryTest.class); suite.addTestSuite(GridCacheBinaryObjectsLocalOffheapTieredSelfTest.class); suite.addTestSuite(GridCacheBinaryObjectsAtomicOffheapTieredSelfTest.class);
