Repository: ignite Updated Branches: refs/heads/ignite-4932 01ceeb134 -> 980b95f99
ignite-4932 added test and benchmark Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/980b95f9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/980b95f9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/980b95f9 Branch: refs/heads/ignite-4932 Commit: 980b95f9995478367d1d45c906eee9b07413ed8a Parents: 01ceeb1 Author: sboikov <[email protected]> Authored: Tue Apr 11 18:18:07 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Apr 11 18:18:07 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheNoSyncForGetTest.java | 217 +++++++++++++++++++ .../cache/IgniteGetFromComputeBenchmark.java | 176 +++++++++++++++ .../IgniteOffheapGetFromComputeBenchmark.java | 28 +++ 3 files changed, 421 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/980b95f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java new file mode 100644 index 0000000..b3470bc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static volatile CountDownLatch processorStartLatch; + + /** */ + private static volatile CountDownLatch hangLatch; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGet() throws Exception { + Ignite srv = startGrid(0); + + client = true; + + Ignite client = startGrid(1); + + final IgniteCache cache = client.createCache(cacheConfiguration()); + + cache.put(1, 1); + + { + hangLatch = new CountDownLatch(1); + processorStartLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invoke(1, new HangEntryProcessor()); + + return null; + } + }); + + try { + boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS); + + assertTrue(wait); + + assertEquals(1, client.compute().affinityCall(cache.getName(), 1, new GetClosure(1, cache.getName()))); + + hangLatch.countDown(); + + fut.get(); + } + finally { + hangLatch.countDown(); + } + } + + { + hangLatch = new CountDownLatch(1); + processorStartLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + cache.invoke(1, new HangEntryProcessor()); + + return null; + } + }); + + try { + boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS); + + assertTrue(wait); + + assertEquals(1, srv.cache(cache.getName()).get(1)); + + hangLatch.countDown(); + + fut.get(); + } + finally { + hangLatch.countDown(); + } + } + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setName("testCache"); + + return ccfg; + } + + /** + * + */ + static class HangEntryProcessor implements CacheEntryProcessor { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... arguments) { + assert processorStartLatch != null; + assert hangLatch != null; + + try { + processorStartLatch.countDown(); + + if (!hangLatch.await(60, TimeUnit.SECONDS)) + throw new RuntimeException("Failed to wait for latch"); + } + catch (Exception e) { + System.out.println("Unexpected error: " + e); + + throw new EntryProcessorException(e); + } + + entry.setValue(U.currentTimeMillis()); + + return null; + } + } + + /** + * + */ + public static class GetClosure implements IgniteCallable<Object> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final int key; + + /** */ + private final String cacheName; + + /** + * @param key Key. + */ + public GetClosure(int key, String cacheName) { + this.key = key; + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return ignite.cache(cacheName).get(key); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/980b95f9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java new file mode 100644 index 0000000..b299f6c --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.yardstick.cache.model.SampleValue; +import org.yardstickframework.BenchmarkConfiguration; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * + */ +public class IgniteGetFromComputeBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { + /** */ + private IgniteCompute compute; + + /** */ + private IgniteCache asyncCache; + + /** */ + private ThreadLocal<IgniteFuture> invokeFut = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + if (args.preloadAmount() > args.range()) + throw new IllegalArgumentException("Preloading amount (\"-pa\", \"--preloadAmount\") " + + "must by less then the range (\"-r\", \"--range\")."); + + String cacheName = cache().getName(); + + println(cfg, "Loading data for cache: " + cacheName); + + long start = System.nanoTime(); + + try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataStreamer(cacheName)) { + for (int i = 0; i < args.preloadAmount(); i++) { + dataLdr.addData(i, new SampleValue(i)); + + if (i % 100000 == 0) { + if (Thread.currentThread().isInterrupted()) + break; + + println("Loaded entries: " + i); + } + } + } + + println(cfg, "Finished populating data [time=" + ((System.nanoTime() - start) / 1_000_000) + "ms, " + + "amount=" + args.preloadAmount() + ']'); + + compute = ignite().compute(); + + GetClosure.cacheName = cacheName; + + asyncCache = cache().withAsync(); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + IgniteFuture fut = invokeFut.get(); + + if (fut == null || fut.isDone()) { + Set<Integer> keys = new TreeSet<>(); + + for (int i = 0; i < 3; i++) + keys.add(nextRandom(args.range())); + + asyncCache.invokeAll(keys, new SlowEntryProcessor(0)); + + invokeFut.set(asyncCache.future()); + } + + int key = nextRandom(args.range()); + + compute.affinityCall(cacheName(), key, new GetClosure(key)); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<Integer, Object> cache() { + return ignite().cache(cacheName()); + } + + /** + * @return Cache name. + */ + protected String cacheName() { + return "atomic"; + } + + /** + * + */ + public static class GetClosure implements IgniteCallable<Object> { + /** */ + static String cacheName; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final int key; + + /** + * @param key Key. + */ + public GetClosure(int key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return ignite.cache(cacheName).get(key); + } + } + + /** + * + */ + public static class SlowEntryProcessor implements CacheEntryProcessor<Integer, Object, Object> { + /** */ + private Object val; + + /** + * @param val Value. + */ + public SlowEntryProcessor(Object val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Integer, Object> entry, Object... args) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignore) { + // No-op. + } + + entry.setValue(val); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/980b95f9/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteOffheapGetFromComputeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteOffheapGetFromComputeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteOffheapGetFromComputeBenchmark.java new file mode 100644 index 0000000..04a2e83 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteOffheapGetFromComputeBenchmark.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +/** + * + */ +public class IgniteOffheapGetFromComputeBenchmark extends IgniteGetFromComputeBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "atomic-offheap"; + } +}
