Repository: ignite Updated Branches: refs/heads/ignite-1607 9f77d2723 -> dce9bd592
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dce9bd59 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dce9bd59 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dce9bd59 Branch: refs/heads/ignite-1607 Commit: dce9bd592f774f01cf7d2bc2b2c4253824c7ab2e Parents: 9f77d27 Author: sboikov <[email protected]> Authored: Fri Oct 23 10:30:44 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 23 10:30:44 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 38 ++-- .../dht/atomic/GridDhtAtomicCache.java | 1 + .../cache/version/GridCacheVersionManager.java | 10 + .../datastreamer/DataStreamerImpl.java | 9 +- .../DateStreamerUpdateAfterLoadTest.java | 184 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 4 +- 6 files changed, 222 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 621ed99..2111594 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3216,34 +3216,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme checkObsolete(); if (curVer == null || curVer.equals(ver)) { - GridCacheMvcc mvcc = mvccExtras(); + if (val != this.val) { + GridCacheMvcc mvcc = mvccExtras(); - if (mvcc != null && !mvcc.isEmpty()) - return null; + if (mvcc != null && !mvcc.isEmpty()) + return null; - if (newVer == null) - newVer = cctx.versions().next(); + if (newVer == null) + newVer = cctx.versions().next(); - CacheObject old = rawGetOrUnmarshalUnlocked(false); + CacheObject old = rawGetOrUnmarshalUnlocked(false); - long ttl = ttlExtras(); + long ttl = ttlExtras(); - long expTime = CU.toExpireTime(ttl); + long expTime = CU.toExpireTime(ttl); - // Detach value before index update. - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + // Detach value before index update. + val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - if (val != null) { - updateIndex(val, expTime, newVer, old); + if (val != null) { + updateIndex(val, expTime, newVer, old); - if (deletedUnlocked()) - deletedUnlocked(false); - } + if (deletedUnlocked()) + deletedUnlocked(false); + } - // Version does not change for load ops. - update(val, expTime, ttl, newVer); + // Version does not change for load ops. + update(val, expTime, ttl, newVer); - return newVer; + return newVer; + } } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9f5ad3e..3c4a5b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1657,6 +1657,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (idx != null) { GridDhtCacheEntry entry = entries.get(idx); + try { GridCacheVersion ver = entry.version(); http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 7a4be0a..87fe515 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -149,6 +149,16 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { } /** + * Version for entries loaded with isolated streamer, should be less than any version generated + * for entries update. + * + * @return Version for entries loaded with isolated streamer. + */ + public GridCacheVersion nextForIsolatedStreamer() { + return next(0, true, false); + } + + /** * @return Next version based on current topology. */ public GridCacheVersion next() { http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index bf9dc78..9f07541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -302,6 +302,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed fut = new DataStreamerFuture(this); publicFut = new IgniteCacheFutureImpl<>(fut); + + ver = ctx.cache().context().versions().nextForIsolatedStreamer(); } /** @@ -1252,7 +1254,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed false, skipStore, rcvr, - ver); + rcvr == ISOLATED_UPDATER ? ver : null); fut = ctx.closure().callLocalSafe(job, false); @@ -1288,9 +1290,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed assert rcvr != null; updaterBytes = ctx.config().getMarshaller().marshal(rcvr); - - if (rcvr == ISOLATED_UPDATER) - ver = ctx.cache().context().versions().next(); } if (topicBytes == null) @@ -1352,7 +1351,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.classLoaderId() : null, dep == null, topVer, - ver); + rcvr == ISOLATED_UPDATER ? ver : null); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java new file mode 100644 index 0000000..fc3e9e0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class DateStreamerUpdateAfterLoadTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final int NODES = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testUpdateAfterLoad() throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + int key = 0; + + try (IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg)) { + key = testLoadAndUpdate(cache.getName(), key, false); + + testLoadAndUpdate(cache.getName(), key, true); + + ignite0.destroyCache(cache.getName()); + } + } + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param allowOverwrite Streamer flag. + * @return Next key. + * @throws Exception If failed. + */ + private int testLoadAndUpdate(String cacheName, int key, boolean allowOverwrite) throws Exception { + for (int loadNode = 0; loadNode < NODES; loadNode++) { + Ignite loadIgnite = ignite(loadNode); + + for (int updateNode = 0; updateNode < NODES; updateNode++) { + try (IgniteDataStreamer<Integer, Integer> streamer = loadIgnite.dataStreamer(cacheName)) { + streamer.allowOverwrite(allowOverwrite); + + streamer.addData(key, key); + } + + Ignite updateIgnite = ignite(updateNode); + + IgniteCache<Integer, Integer> cache = updateIgnite.cache(cacheName); + + if (allowOverwrite) + atomicClockModeDelay(cache); + + updateIgnite.cache(cacheName).put(key, key + 1); + + checkValue(key, key + 1, cacheName); + + key++; + } + } + + return key; + } + + /** + * @param key Key. + * @param val Value. + * @param cacheName Cache name. + */ + private void checkValue(Integer key, Integer val, String cacheName) { + for (int i = 0; i < NODES; i++) { + IgniteCache<Integer, Integer> cache = ignite(i).cache(cacheName); + + assertEquals("Unexpected value " + i, val, cache.get(key)); + } + } + + /** + * @return Cache configurations to test. + */ + private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 1, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 0, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, CLOCK, 1, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 1, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 0, "cache-" + ccfgs.size())); + + return ccfgs; + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param writeOrderMode Cache write order mode. + * @param backups Number of backups. + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode, + CacheAtomicWriteOrderMode writeOrderMode, + int backups, + String name) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setName(name); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setBackups(backups); + ccfg.setAtomicWriteOrderMode(writeOrderMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 1deb3bc..8f719cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -122,6 +122,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSel import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest; +import org.apache.ignite.internal.processors.datastreamer.DateStreamerUpdateAfterLoadTest; import org.apache.ignite.testframework.GridTestUtils; /** @@ -213,7 +214,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorSelfTest.class, ignoredTests); - suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class); + GridTestUtils.addTestIfNeeded(suite, DateStreamerUpdateAfterLoadTest.class, ignoredTests); + suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class); suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class); suite.addTestSuite(DataStreamerImplSelfTest.class); GridTestUtils.addTestIfNeeded(suite, GridCacheEntryMemorySizeSelfTest.class, ignoredTests);
