Repository: ignite Updated Branches: refs/heads/ignite-1607 61df8c26d -> 353249795
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35324979 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35324979 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35324979 Branch: refs/heads/ignite-1607 Commit: 3532497953875f15a99a466baa33c22fdb962749 Parents: 61df8c2 Author: sboikov <[email protected]> Authored: Fri Oct 23 17:05:36 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 23 17:05:36 2015 +0300 ---------------------------------------------------------------------- .../cache/version/GridCacheVersionManager.java | 18 +- .../datastreamer/DataStreamProcessor.java | 3 +- .../datastreamer/DataStreamerImpl.java | 36 +--- .../datastreamer/DataStreamerRequest.java | 34 +--- .../datastreamer/DataStreamerUpdateJob.java | 33 ++-- .../DataStreamerUpdateAfterLoadTest.java | 184 +++++++++++++++++++ .../DateStreamerUpdateAfterLoadTest.java | 184 ------------------- .../ignite/testsuites/IgniteCacheTestSuite.java | 4 +- 8 files changed, 222 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 87fe515..68d03cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -61,6 +61,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { private long gridStartTime; /** */ + private GridCacheVersion ISOLATED_STREAMER_VER; + + /** */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { assert evt.type() == EVT_NODE_METRICS_UPDATED; @@ -154,8 +157,19 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * * @return Version for entries loaded with isolated streamer. */ - public GridCacheVersion nextForIsolatedStreamer() { - return next(0, true, false); + public GridCacheVersion isolatedStreamerVersion() { + if (ISOLATED_STREAMER_VER == null) { + long topVer = 1; + + if (gridStartTime == 0) + gridStartTime = cctx.kernalContext().discovery().gridStartTime(); + + topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000; + + ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 0, 1, dataCenterId); + } + + return ISOLATED_STREAMER_VER; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 20a013b..5150d83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -289,8 +289,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { col, req.ignoreDeploymentOwnership(), req.skipStore(), - updater, - req.version()); + updater); Exception err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 9f07541..2190bf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -213,9 +213,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** */ private int maxRemapCnt = DFLT_MAX_REMAP_CNT; - /** */ - private GridCacheVersion ver; - /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */ private static boolean isWarningPrinted; @@ -302,8 +299,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed fut = new DataStreamerFuture(this); publicFut = new IgniteCacheFutureImpl<>(fut); - - ver = ctx.cache().context().versions().nextForIsolatedStreamer(); } /** @@ -1247,16 +1242,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed IgniteInternalFuture<Object> fut; if (isLocNode) { - DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx, - log, - cacheName, - entries, - false, - skipStore, - rcvr, - rcvr == ISOLATED_UPDATER ? ver : null); - - fut = ctx.closure().callLocalSafe(job, false); + fut = ctx.closure().callLocalSafe( + new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, rcvr), false); locFuts.add(fut); @@ -1350,8 +1337,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, dep == null, - topVer, - rcvr == ISOLATED_UPDATER ? ver : null); + topVer); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); @@ -1551,7 +1537,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * Isolated receiver which only loads entry initial value. */ - static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, + private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, DataStreamerCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; @@ -1559,17 +1545,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@inheritDoc} */ @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache, Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { - receive(cache, entries, null); - } - - /** - * @param cache Cache. - * @param entries Entries. - * @param ver Entries version. - */ - void receive(IgniteCache<KeyCacheObject, CacheObject> cache, - Collection<Map.Entry<KeyCacheObject, CacheObject>> entries, - GridCacheVersion ver) { IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); @@ -1581,8 +1556,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - if (ver == null) - ver = cctx.versions().next(topVer); + GridCacheVersion ver = cctx.versions().isolatedStreamerVersion(); long ttl = CU.TTL_ETERNAL; long expiryTime = CU.EXPIRE_TIME_ETERNAL; http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index 59810ee..c1a1528 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -25,7 +25,6 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -88,9 +87,6 @@ public class DataStreamerRequest implements Message { /** Topology version. */ private AffinityTopologyVersion topVer; - /** */ - private GridCacheVersion ver; - /** * {@code Externalizable} support. */ @@ -113,7 +109,6 @@ public class DataStreamerRequest implements Message { * @param clsLdrId Class loader ID. * @param forceLocDep Force local deployment. * @param topVer Topology version. - * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}. */ public DataStreamerRequest(long reqId, byte[] resTopicBytes, @@ -128,8 +123,7 @@ public class DataStreamerRequest implements Message { Map<UUID, IgniteUuid> ldrParticipants, IgniteUuid clsLdrId, boolean forceLocDep, - @NotNull AffinityTopologyVersion topVer, - @Nullable GridCacheVersion ver) { + @NotNull AffinityTopologyVersion topVer) { assert topVer != null; this.reqId = reqId; @@ -146,14 +140,6 @@ public class DataStreamerRequest implements Message { this.clsLdrId = clsLdrId; this.forceLocDep = forceLocDep; this.topVer = topVer; - this.ver = ver; - } - - /** - * @return Version. - */ - @Nullable public GridCacheVersion version() { - return ver; } /** @@ -355,12 +341,6 @@ public class DataStreamerRequest implements Message { writer.incrementState(); - case 14: - if (!writer.writeMessage("ver", ver)) - return false; - - writer.incrementState(); - } return true; @@ -490,14 +470,6 @@ public class DataStreamerRequest implements Message { reader.incrementState(); - case 14: - ver = reader.readMessage("ver"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(DataStreamerRequest.class); @@ -510,6 +482,6 @@ public class DataStreamerRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 14; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 12eee88..42084a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; @@ -57,9 +56,6 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { /** */ private final StreamReceiver rcvr; - /** */ - private final GridCacheVersion ver; - /** * @param ctx Context. * @param log Log. @@ -68,7 +64,6 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { * @param ignoreDepOwnership {@code True} to ignore deployment ownership. * @param skipStore Skip store flag. * @param rcvr Updater. - * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}. */ DataStreamerUpdateJob( GridKernalContext ctx, @@ -77,8 +72,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { Collection<DataStreamerEntry> col, boolean ignoreDepOwnership, boolean skipStore, - StreamReceiver<?, ?> rcvr, - @Nullable GridCacheVersion ver) { + StreamReceiver<?, ?> rcvr) { this.ctx = ctx; this.log = log; @@ -90,7 +84,6 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { this.ignoreDepOwnership = ignoreDepOwnership; this.skipStore = skipStore; this.rcvr = rcvr; - this.ver = ver; } /** {@inheritDoc} */ @@ -126,21 +119,17 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { checkSecurityPermission(SecurityPermission.CACHE_REMOVE); } - if (rcvr instanceof DataStreamerImpl.IsolatedUpdater) - ((DataStreamerImpl.IsolatedUpdater)rcvr).receive(cache, (Collection)col, ver); - else { - if (unwrapEntries()) { - Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { - @Override public Map.Entry apply(DataStreamerEntry e) { - return e.toEntry(cctx); - } - }); - - rcvr.receive(cache, col0); - } - else - rcvr.receive(cache, col); + if (unwrapEntries()) { + Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { + @Override public Map.Entry apply(DataStreamerEntry e) { + return e.toEntry(cctx); + } + }); + + rcvr.receive(cache, col0); } + else + rcvr.receive(cache, col); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java new file mode 100644 index 0000000..32e204b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class DataStreamerUpdateAfterLoadTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final int NODES = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testUpdateAfterLoad() throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + int key = 0; + + try (IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg)) { + key = testLoadAndUpdate(cache.getName(), key, false); + + testLoadAndUpdate(cache.getName(), key, true); + + ignite0.destroyCache(cache.getName()); + } + } + } + + /** + * @param cacheName Cache name. + * @param key Key. + * @param allowOverwrite Streamer flag. + * @return Next key. + * @throws Exception If failed. + */ + private int testLoadAndUpdate(String cacheName, int key, boolean allowOverwrite) throws Exception { + for (int loadNode = 0; loadNode < NODES; loadNode++) { + Ignite loadIgnite = ignite(loadNode); + + for (int updateNode = 0; updateNode < NODES; updateNode++) { + try (IgniteDataStreamer<Integer, Integer> streamer = loadIgnite.dataStreamer(cacheName)) { + streamer.allowOverwrite(allowOverwrite); + + streamer.addData(key, key); + } + + Ignite updateIgnite = ignite(updateNode); + + IgniteCache<Integer, Integer> cache = updateIgnite.cache(cacheName); + + if (allowOverwrite) + atomicClockModeDelay(cache); + + updateIgnite.cache(cacheName).put(key, key + 1); + + checkValue(key, key + 1, cacheName); + + key++; + } + } + + return key; + } + + /** + * @param key Key. + * @param val Value. + * @param cacheName Cache name. + */ + private void checkValue(Integer key, Integer val, String cacheName) { + for (int i = 0; i < NODES; i++) { + IgniteCache<Integer, Integer> cache = ignite(i).cache(cacheName); + + assertEquals("Unexpected value " + i, val, cache.get(key)); + } + } + + /** + * @return Cache configurations to test. + */ + private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 1, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 0, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, CLOCK, 1, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 1, "cache-" + ccfgs.size())); + ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 0, "cache-" + ccfgs.size())); + + return ccfgs; + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param writeOrderMode Cache write order mode. + * @param backups Number of backups. + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode, + CacheAtomicWriteOrderMode writeOrderMode, + int backups, + String name) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setName(name); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setBackups(backups); + ccfg.setAtomicWriteOrderMode(writeOrderMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java deleted file mode 100644 index fc3e9e0..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastreamer; - -import java.util.ArrayList; -import java.util.List; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CacheAtomicWriteOrderMode; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * - */ -public class DateStreamerUpdateAfterLoadTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private boolean client; - - /** */ - private static final int NODES = 4; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - - cfg.setClientMode(client); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(NODES - 1); - - client = true; - - startGrid(NODES - 1); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - public void testUpdateAfterLoad() throws Exception { - Ignite ignite0 = ignite(0); - - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { - int key = 0; - - try (IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg)) { - key = testLoadAndUpdate(cache.getName(), key, false); - - testLoadAndUpdate(cache.getName(), key, true); - - ignite0.destroyCache(cache.getName()); - } - } - } - - /** - * @param cacheName Cache name. - * @param key Key. - * @param allowOverwrite Streamer flag. - * @return Next key. - * @throws Exception If failed. - */ - private int testLoadAndUpdate(String cacheName, int key, boolean allowOverwrite) throws Exception { - for (int loadNode = 0; loadNode < NODES; loadNode++) { - Ignite loadIgnite = ignite(loadNode); - - for (int updateNode = 0; updateNode < NODES; updateNode++) { - try (IgniteDataStreamer<Integer, Integer> streamer = loadIgnite.dataStreamer(cacheName)) { - streamer.allowOverwrite(allowOverwrite); - - streamer.addData(key, key); - } - - Ignite updateIgnite = ignite(updateNode); - - IgniteCache<Integer, Integer> cache = updateIgnite.cache(cacheName); - - if (allowOverwrite) - atomicClockModeDelay(cache); - - updateIgnite.cache(cacheName).put(key, key + 1); - - checkValue(key, key + 1, cacheName); - - key++; - } - } - - return key; - } - - /** - * @param key Key. - * @param val Value. - * @param cacheName Cache name. - */ - private void checkValue(Integer key, Integer val, String cacheName) { - for (int i = 0; i < NODES; i++) { - IgniteCache<Integer, Integer> cache = ignite(i).cache(cacheName); - - assertEquals("Unexpected value " + i, val, cache.get(key)); - } - } - - /** - * @return Cache configurations to test. - */ - private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { - List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); - - ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 1, "cache-" + ccfgs.size())); - ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 0, "cache-" + ccfgs.size())); - ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, CLOCK, 1, "cache-" + ccfgs.size())); - ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 1, "cache-" + ccfgs.size())); - ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 0, "cache-" + ccfgs.size())); - - return ccfgs; - } - - /** - * @param atomicityMode Cache atomicity mode. - * @param writeOrderMode Cache write order mode. - * @param backups Number of backups. - * @param name Cache name. - * @return Cache configuration. - */ - private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode, - CacheAtomicWriteOrderMode writeOrderMode, - int backups, - String name) { - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); - - ccfg.setName(name); - ccfg.setAtomicityMode(atomicityMode); - ccfg.setBackups(backups); - ccfg.setAtomicWriteOrderMode(writeOrderMode); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - - return ccfg; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 8f719cf..7925642 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -122,7 +122,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSel import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest; -import org.apache.ignite.internal.processors.datastreamer.DateStreamerUpdateAfterLoadTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateAfterLoadTest; import org.apache.ignite.testframework.GridTestUtils; /** @@ -214,7 +214,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorSelfTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, DateStreamerUpdateAfterLoadTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, DataStreamerUpdateAfterLoadTest.class, ignoredTests); suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class); suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class); suite.addTestSuite(DataStreamerImplSelfTest.class);
