http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java new file mode 100644 index 0000000..955a792 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests cache in-place modification logic with iterative value increment. + */ +public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of nodes to test on. */ + private static final int GRID_CNT = 2; + + /** Number of increment iterations. */ + private static final int NUM_SETS = 50; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = new CacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setAtomicityMode(atomicityMode()); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setBackups(1); + cache.setRebalanceMode(SYNC); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @return Atomicity mode. + */ + protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleEntryProcessorNodeJoin() throws Exception { + checkEntryProcessorNodeJoin(false); + } + + /** + * @throws Exception If failed. + */ + public void testAllEntryProcessorNodeJoin() throws Exception { + checkEntryProcessorNodeJoin(true); + } + + /** + * @param invokeAll If {@code true} tests invokeAll operation. + * @throws Exception If failed. + */ + private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final AtomicReference<Throwable> error = new AtomicReference<>(); + final int started = 6; + + try { + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < started; i++) { + U.sleep(1_000); + + startGrid(GRID_CNT + i); + } + } + catch (Exception e) { + error.compareAndSet(null, e); + } + } + }, 1, "starter"); + + try { + checkIncrement(invokeAll); + } + finally { + stop.set(true); + + fut.get(getTestTimeout()); + } + + for (int i = 0; i < NUM_SETS; i++) { + for (int g = 0; g < GRID_CNT + started; g++) { + Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i); + + assertNotNull(vals); + assertEquals(100, vals.size()); + } + } + } + finally { + for (int i = 0; i < started; i++) + stopGrid(GRID_CNT + i); + } + } + + /** + * @param invokeAll If {@code true} tests invokeAll operation. + * @throws Exception If failed. + */ + private void checkIncrement(boolean invokeAll) throws Exception { + for (int k = 0; k < 100; k++) { + if (invokeAll) { + IgniteCache<String, Set<String>> cache = ignite(0).cache(null); + + Map<String, Processor> procs = new LinkedHashMap<>(); + + for (int i = 0; i < NUM_SETS; i++) { + String key = "set-" + i; + + String val = "value-" + k; + + cache.invoke(key, new Processor(val)); + } + + cache.invokeAll(procs); + } + else { + for (int i = 0; i < NUM_SETS; i++) { + String key = "set-" + i; + + String val = "value-" + k; + + IgniteCache<String, Set<String>> cache = ignite(0).cache(null); + + cache.invoke(key, new Processor(val)); + } + } + } + } + + /** */ + private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable { + /** */ + private String val; + + /** + * @param val Value. + */ + private Processor(String val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) { + Set<String> vals = e.getValue(); + + if (vals == null) + vals = new HashSet<>(); + + vals.add(val); + + e.setValue(vals); + + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java new file mode 100644 index 0000000..7b69674 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteCacheTopologySafeGetSelfTest extends GridCommonAbstractTest { + /** Number of initial grids. */ + public static final int GRID_CNT = 4; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** TX commit latch. */ + private CountDownLatch releaseLatch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration( + cacheCfg("tx", TRANSACTIONAL, false), + cacheCfg("atomic", ATOMIC, false), + cacheCfg("tx_near", TRANSACTIONAL, true), + cacheCfg("atomic_near", ATOMIC, true)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param near Near enabled flag. + * @return Cache configuration. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cacheCfg(String name, CacheAtomicityMode cacheMode, boolean near) { + CacheConfiguration cfg = new CacheConfiguration(name); + + cfg.setAtomicityMode(cacheMode); + cfg.setBackups(1); + + if (near) + cfg.setNearConfiguration(new NearCacheConfiguration()); + else + cfg.setNearConfiguration(null); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologySafeNodeJoin() throws Exception { + checkGetTopologySafeNodeJoin(false); + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologySafeNodeJoinPrimaryLeave() throws Exception { + checkGetTopologySafeNodeJoin(true); + } + + /** + * @throws Exception If failed. + */ + public void checkGetTopologySafeNodeJoin(boolean failPrimary) throws Exception { + startGrids(GRID_CNT); + + awaitPartitionMapExchange(); + + try { + ClusterNode targetNode = ignite(1).cluster().localNode(); + + info(">>> Target node: " + targetNode.id()); + + // Populate caches with a key that does not belong to ignite(0). + int key = -1; + for (int i = 0; i < 100; i++) { + Collection<ClusterNode> nodes = ignite(0).affinity("tx").mapKeyToPrimaryAndBackups(i); + ClusterNode primaryNode = F.first(nodes); + + if (!nodes.contains(ignite(0).cluster().localNode()) && primaryNode.id().equals(targetNode.id())) { + ignite(1).cache("tx").put(i, i); + ignite(1).cache("atomic").put(i, i); + ignite(1).cache("tx_near").put(i, i); + ignite(1).cache("atomic_near").put(i, i); + + key = i; + + + break; + } + } + + assertTrue(key != -1); + + IgniteInternalFuture<?> txFut = startBlockingTxAsync(); + + IgniteInternalFuture<?> nodeFut = startNodeAsync(); + + if (failPrimary) + stopGrid(1); + + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx_near").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic_near").getTopologySafe(key)); + + releaseTx(); + + txFut.get(); + nodeFut.get(); + } + finally { + stopAllGrids(); + } + } + + /** + * @return Future. + * @throws Exception If failed. + */ + private IgniteInternalFuture<?> startNodeAsync() throws Exception { + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + startGrid(GRID_CNT); + + return null; + } + }); + + U.sleep(1000); + + return fut; + } + + /** + * @return TX release future. + * @throws Exception If failed. + */ + private IgniteInternalFuture<?> startBlockingTxAsync() throws Exception { + final CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try (Transaction ignore = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 30; i++) + ignite(0).cache("tx").get("value-" + i); + + releaseLatch = new CountDownLatch(1); + + lockLatch.countDown(); + + releaseLatch.await(); + } + + return null; + } + }); + + lockLatch.await(); + + return fut; + } + + /** + * + */ + private void releaseTx() { + assert releaseLatch != null; + + releaseLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java index 8f107e4..013dd74 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java @@ -41,6 +41,12 @@ import static org.apache.ignite.events.EventType.*; * Test cases for multi-threaded tests. */ public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE1 = null; + + /** */ + private static final String CACHE2 = "cache2"; + /** Grid 1. */ private static Ignite ignite1; @@ -70,12 +76,20 @@ public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstr cfg.setDiscoverySpi(disco); - cfg.setCacheConfiguration(defaultCacheConfiguration()); + CacheConfiguration ccfg1 = cacheConfiguration().setName(CACHE1); + CacheConfiguration ccfg2 = cacheConfiguration().setName(CACHE2); + + cfg.setCacheConfiguration(ccfg1, ccfg2); return cfg; } /** + * @return Cache configuration. + */ + protected abstract CacheConfiguration cacheConfiguration(); + + /** * @return {@code True} for partitioned caches. */ protected boolean partitioned() { @@ -529,6 +543,31 @@ public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstr } /** + * @throws Exception If failed. + */ + public void testTwoCaches() throws Exception { + IgniteCache<Integer, String> cache1 = ignite1.cache(CACHE1); + IgniteCache<Integer, String> cache2 = ignite1.cache(CACHE2); + + final Integer key = primaryKey(cache1); + + Lock lock = cache1.lock(key); + + lock.lock(); + + try { + assertTrue(cache1.isLocalLocked(key, true)); + assertTrue(cache1.isLocalLocked(key, false)); + + assertFalse(cache2.isLocalLocked(key, true)); + assertFalse(cache2.isLocalLocked(key, false)); + } + finally { + lock.unlock(); + } + } + + /** * Cache unlock listener. */ private class UnlockListener implements IgnitePredicate<Event> { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java index 459e015..d05764c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java @@ -165,6 +165,8 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { startGrids(GRID_CNT); + awaitPartitionMapExchange(); + ignites = new Ignite[GRID_CNT]; ids = new UUID[GRID_CNT]; caches = new IgniteCache[GRID_CNT]; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java new file mode 100644 index 0000000..5432e76 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteCacheCrossCacheTxFailoverTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final int KEY_RANGE = 1000; + + /** */ + private static final long TEST_TIME = 3 * 60_000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (gridName.equals(getTestGridName(GRID_CNT - 1))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param parts Number of partitions. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, CacheMode cacheMode, int parts) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + ccfg.setAffinity(new RendezvousAffinityFunction(false, parts)); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIME + 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testCrossCachePessimisticTxFailover() throws Exception { + crossCacheTxFailover(PARTITIONED, true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCachePessimisticTxFailoverDifferentAffinity() throws Exception { + crossCacheTxFailover(PARTITIONED, false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheOptimisticTxFailover() throws Exception { + crossCacheTxFailover(PARTITIONED, true, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheOptimisticTxFailoverDifferentAffinity() throws Exception { + crossCacheTxFailover(PARTITIONED, false, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCachePessimisticTxFailoverReplicated() throws Exception { + crossCacheTxFailover(REPLICATED, true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheOptimisticTxFailoverReplicated() throws Exception { + crossCacheTxFailover(REPLICATED, true, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCachePessimisticTxFailoverDifferentAffinityReplicated() throws Exception { + crossCacheTxFailover(PARTITIONED, false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @param cacheMode Cache mode. + * @param sameAff If {@code false} uses different number of partitions for caches. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void crossCacheTxFailover(CacheMode cacheMode, + boolean sameAff, + final TransactionConcurrency concurrency, + final TransactionIsolation isolation) throws Exception { + IgniteKernal ignite0 = (IgniteKernal)ignite(0); + + final AtomicBoolean stop = new AtomicBoolean(); + + try { + ignite0.createCache(cacheConfiguration(CACHE1, cacheMode, 256)); + ignite0.createCache(cacheConfiguration(CACHE2, cacheMode, sameAff ? 256 : 128)); + + final AtomicInteger threadIdx = new AtomicInteger(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = threadIdx.getAndIncrement(); + + Ignite ignite = ignite(idx % GRID_CNT); + + log.info("Started update thread [node=" + ignite.name() + + ", client=" + ignite.configuration().isClientMode() + ']'); + + IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1); + IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2); + + assertNotSame(cache1, cache2); + + IgniteTransactions txs = ignite.transactions(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long iter = 0; + + while (!stop.get()) { + boolean sameKey = rnd.nextBoolean(); + + try { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + if (sameKey) { + TestKey key = new TestKey(rnd.nextLong(KEY_RANGE)); + + cacheOperation(rnd, cache1, key); + cacheOperation(rnd, cache2, key); + } + else { + TestKey key1 = new TestKey(rnd.nextLong(KEY_RANGE)); + TestKey key2 = new TestKey(key1.key() + 1); + + cacheOperation(rnd, cache1, key1); + cacheOperation(rnd, cache2, key2); + } + + tx.commit(); + } + } + catch (CacheException | IgniteException e) { + log.info("Update error: " + e); + } + + if (iter++ % 500 == 0) + log.info("Iteration: " + iter); + } + + return null; + } + + /** + * @param rnd Random. + * @param cache Cache. + * @param key Key. + */ + private void cacheOperation(ThreadLocalRandom rnd, IgniteCache<TestKey, TestValue> cache, TestKey key) { + switch (rnd.nextInt(4)) { + case 0: + cache.put(key, new TestValue(rnd.nextLong())); + + break; + + case 1: + cache.remove(key); + + break; + + case 2: + cache.invoke(key, new TestEntryProcessor(rnd.nextBoolean() ? 1L : null)); + + break; + + case 3: + cache.get(key); + + break; + + default: + assert false; + } + } + }, 10, "tx-thread"); + + long stopTime = System.currentTimeMillis() + 3 * 60_000; + + long topVer = ignite0.cluster().topologyVersion(); + + boolean failed = false; + + while (System.currentTimeMillis() < stopTime) { + log.info("Start node."); + + IgniteKernal ignite = (IgniteKernal)startGrid(GRID_CNT); + + assertFalse(ignite.configuration().isClientMode()); + + topVer++; + + IgniteInternalFuture<?> affFut = ignite.context().cache().context().exchange().affinityReadyFuture( + new AffinityTopologyVersion(topVer)); + + try { + if (affFut != null) + affFut.get(30_000); + } + catch (IgniteFutureTimeoutCheckedException e) { + log.error("Failed to wait for affinity future after start: " + topVer); + + failed = true; + + break; + } + + Thread.sleep(500); + + log.info("Stop node."); + + stopGrid(GRID_CNT); + + topVer++; + + affFut = ignite0.context().cache().context().exchange().affinityReadyFuture( + new AffinityTopologyVersion(topVer)); + + try { + if (affFut != null) + affFut.get(30_000); + } + catch (IgniteFutureTimeoutCheckedException e) { + log.error("Failed to wait for affinity future after stop: " + topVer); + + failed = true; + + break; + } + } + + stop.set(true); + + fut.get(); + + assertFalse("Test failed, see log for details.", failed); + } + finally { + stop.set(true); + + ignite0.destroyCache(CACHE1); + ignite0.destroyCache(CACHE2); + + awaitPartitionMapExchange(); + } + } + + /** + * + */ + private static class TestKey implements Serializable { + /** */ + private long key; + + /** + * @param key Key. + */ + public TestKey(long key) { + this.key = key; + } + + /** + * @return Key. + */ + public long key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey testKey = (TestKey)o; + + return key == testKey.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(key ^ (key >>> 32)); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private long val; + + /** + * @param val Value. + */ + public TestValue(long val) { + this.val = val; + } + + /** + * @return Value. + */ + public long value() { + return val; + } + } + + /** + * + */ + private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> { + /** */ + private Long val; + + /** + * @param val Value. + */ + public TestEntryProcessor(@Nullable Long val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) { + TestValue old = e.getValue(); + + if (val != null) + e.setValue(new TestValue(val)); + + return old; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index c798369..5d0cacc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -54,6 +54,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr cfg.setAtomicWriteOrderMode(writeOrderMode()); cfg.setBackups(1); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 1d14dec..0ab5729 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -19,17 +19,29 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + /** * */ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest { + /** */ + private static final int FACTOR = 1000; + /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { return CacheAtomicityMode.TRANSACTIONAL; @@ -76,4 +88,179 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr finished.set(true); } } + + /** {@inheritDoc} */ + public void testExplicitTransactionRetries() throws Exception { + final AtomicInteger idx = new AtomicInteger(); + int threads = 8; + + final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override + public Object call() throws Exception { + int th = idx.getAndIncrement(); + int base = th * FACTOR; + + Ignite ignite = ignite(0); + final IgniteCache<Object, Object> cache = ignite.cache(null); + + try { + for (int i = 0; i < FACTOR; i++) { + doInTransaction(ignite, new ProcessCallable(cache, base, i)); + + if (i > 0 && i % 500 == 0) + info("Done: " + i); + } + } + catch (Exception e) { + err.set(th, e); + } + + return null; + } + }, threads, "tx-runner"); + + while (!fut.isDone()) { + int stopIdx = ThreadLocalRandom.current().nextInt(2, 4); // Random in [2, 3]. + + stopGrid(stopIdx); + + U.sleep(500); + + startGrid(stopIdx); + } + + for (int i = 0; i < threads; i++) { + Exception error = err.get(i); + + if (error != null) + throw error; + } + + // Verify contents of the cache. + for (int g = 0; g < gridCount(); g++) { + IgniteCache<Object, Object> cache = ignite(g).cache(null); + + for (int th = 0; th < threads; th++) { + int base = th * FACTOR; + + String key = "key-" + base; + + Set<String> set = (Set<String>)cache.get(key); + + assertNotNull("Missing set for key: " + key, set); + assertEquals(FACTOR, set.size()); + + for (int i = 0; i < FACTOR; i++) { + assertEquals("value-" + i, cache.get("key-" + base + "-" + i)); + assertTrue(set.contains("value-" + i)); + } + } + } + } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception If failed. + */ + private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + IgniteFuture<?> fut = e.retryReadyFuture(); + + fut.get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + + /** + * Callable to process inside transaction. + */ + private static class ProcessCallable implements Callable<Void> { + /** */ + private IgniteCache cache; + + /** */ + private int base; + + /** */ + private int i; + + /** + * @param cache Cache. + * @param base Base index. + * @param i Iteration index. + */ + private ProcessCallable(IgniteCache<Object, Object> cache, int base, int i) { + this.cache = cache; + this.base = base; + this.i = i; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Void call() throws Exception { + String key1 = "key-" + base + "-" + i; + String key2 = "key-" + base; + + assert key1.compareTo(key2) > 0; + + ((IgniteCache<String, String>)cache).put(key1, "value-" + i); + + ((IgniteCache<String, Set<String>>)cache).invoke(key2, new AddEntryProcessor("value-" + i)); + + return null; + } + } + + /** + * + */ + private static class AddEntryProcessor implements CacheEntryProcessor<String, Set<String>, Void> { + /** */ + private String addVal; + + /** + * @param addVal Value to add. + */ + private AddEntryProcessor(String addVal) { + this.addVal = addVal; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<String, Set<String>> entry, Object... arguments) throws EntryProcessorException { + Set<String> set = entry.getValue(); + + if (set == null) + set = new HashSet<>(); + + set.add(addVal); + + entry.setValue(set); + + return null; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java index b6bc56e..d1d7c02 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java @@ -198,7 +198,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest { } // Test optimistic transaction. - GridTestUtils.assertThrows(log, new Callable<Object>() { + GridTestUtils.assertThrowsWithCause(new Callable<Object>() { @Override public Object call() throws Exception { try (Transaction tx = igniteNearOnly.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { nearOnly.put("key", "val"); @@ -208,7 +208,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest { return null; } - }, ClusterTopologyException.class, null); + }, ClusterTopologyCheckedException.class); // Test pessimistic transaction. GridTestUtils.assertThrowsWithCause(new Callable<Object>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java new file mode 100644 index 0000000..44ef20d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(ASYNC); + ccfg.setRebalanceDelay(5000); + ccfg.setBackups(0); + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * Test provokes scenario when primary node sends force key request to node started transaction. + * + * @throws Exception If failed. + */ + public void testNearTx() throws Exception { + Ignite ignite0 = startGrid(0); + + IgniteCache<Integer, Integer> cache = ignite0.cache(null); + + Ignite ignite1 = startGrid(1); + + final Integer key = 2; + + assertNull(cache.getAndPut(key, key)); + + assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java index 6138022..6ceded3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java @@ -32,6 +32,8 @@ import java.util.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * Test for issue GG-3997 Total Hits and Misses display wrong value for in-memory database. @@ -50,18 +52,18 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - // DiscoverySpi + // DiscoverySpi. TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(IP_FINDER); cfg.setDiscoverySpi(disco); // Cache. - cfg.setCacheConfiguration(cacheConfiguration(gridName)); + cfg.setCacheConfiguration(cacheConfiguration()); TransactionConfiguration tCfg = new TransactionConfiguration(); - tCfg.setDefaultTxConcurrency(TransactionConcurrency.PESSIMISTIC); - tCfg.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ); + tCfg.setDefaultTxConcurrency(PESSIMISTIC); + tCfg.setDefaultTxIsolation(REPEATABLE_READ); cfg.setTransactionConfiguration(tCfg); @@ -71,20 +73,18 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac /** * Cache configuration. * - * @param gridName Grid name. * @return Cache configuration. * @throws Exception In case of error. */ - protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + protected CacheConfiguration cacheConfiguration() throws Exception { CacheConfiguration cfg = defaultCacheConfiguration(); + cfg.setCacheMode(PARTITIONED); cfg.setStartSize(700000); cfg.setWriteSynchronizationMode(FULL_ASYNC); cfg.setEvictionPolicy(null); cfg.setBackups(1); cfg.setNearConfiguration(null); - cfg.setRebalanceDelay(-1); - cfg.setBackups(1); cfg.setStatisticsEnabled(true); return cfg; @@ -96,10 +96,10 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testHitsAndMisses() throws Exception { - assert(GRID_CNT > 0); - startGrids(GRID_CNT); + awaitPartitionMapExchange(); + try { final Ignite g = grid(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java index a782aec..bbc56e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java @@ -29,9 +29,7 @@ import static org.apache.ignite.cache.CacheMode.*; */ public class GridCachePartitionedMultiNodeLockSelfTest extends GridCacheMultiNodeLockAbstractTest { /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - + @Override protected CacheConfiguration cacheConfiguration() { CacheConfiguration cc = defaultCacheConfiguration(); cc.setCacheMode(PARTITIONED); @@ -39,9 +37,7 @@ public class GridCachePartitionedMultiNodeLockSelfTest extends GridCacheMultiNod cc.setAtomicityMode(TRANSACTIONAL); cc.setNearConfiguration(new NearCacheConfiguration()); - c.setCacheConfiguration(cc); - - return c; + return cc; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java index 70e0ad6..bf3620b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java @@ -32,15 +32,11 @@ public class GridCacheReplicatedMultiNodeLockSelfTest extends GridCacheMultiNode } /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration cfg = super.getConfiguration(); - + @Override protected CacheConfiguration cacheConfiguration() { CacheConfiguration cacheCfg = defaultCacheConfiguration(); cacheCfg.setCacheMode(REPLICATED); - cfg.setCacheConfiguration(cacheCfg); - - return cfg; + return cacheCfg; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java index 218b817..7910e41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.testframework.junits.common.*; import java.util.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@ -49,14 +50,18 @@ public class LruNearEvictionPolicySelfTest extends GridCommonAbstractTest { /** Cache atomicity mode specified by test. */ private CacheAtomicityMode atomicityMode; + /** Memory mode. */ + private CacheMemoryMode memMode; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration c = super.getConfiguration(gridName); CacheConfiguration cc = new CacheConfiguration(); - cc.setAtomicityMode(atomicityMode); cc.setCacheMode(PARTITIONED); + cc.setAtomicityMode(atomicityMode); + cc.setMemoryMode(memMode); cc.setWriteSynchronizationMode(PRIMARY_SYNC); cc.setRebalanceMode(SYNC); cc.setStartSize(100); @@ -86,6 +91,17 @@ public class LruNearEvictionPolicySelfTest extends GridCommonAbstractTest { */ public void testAtomicNearEvictionMaxSize() throws Exception { atomicityMode = ATOMIC; + memMode = ONHEAP_TIERED; + + checkNearEvictionMaxSize(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffHeapNearEvictionMaxSize() throws Exception { + atomicityMode = ATOMIC; + memMode = CacheMemoryMode.OFFHEAP_TIERED; checkNearEvictionMaxSize(); } @@ -95,6 +111,17 @@ public class LruNearEvictionPolicySelfTest extends GridCommonAbstractTest { */ public void testTransactionalNearEvictionMaxSize() throws Exception { atomicityMode = TRANSACTIONAL; + memMode = ONHEAP_TIERED; + + checkNearEvictionMaxSize(); + } + + /** + * @throws Exception If failed. + */ + public void testTransactionalOffHeapNearEvictionMaxSize() throws Exception { + atomicityMode = TRANSACTIONAL; + memMode = CacheMemoryMode.OFFHEAP_TIERED; checkNearEvictionMaxSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java index 0d3c692..6bf343b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@ -53,6 +54,9 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes /** Cache atomicity mode specified by test. */ private CacheAtomicityMode atomicityMode; + /** Memory mode. */ + private CacheMemoryMode memMode; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -69,8 +73,9 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes else { CacheConfiguration cc = new CacheConfiguration(); - cc.setAtomicityMode(atomicityMode); cc.setCacheMode(cacheMode); + cc.setAtomicityMode(atomicityMode); + cc.setMemoryMode(memMode); cc.setWriteSynchronizationMode(PRIMARY_SYNC); cc.setRebalanceMode(SYNC); cc.setStartSize(100); @@ -92,6 +97,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes public void testPartitionedAtomicNearEvictionMaxSize() throws Exception { atomicityMode = ATOMIC; cacheMode = PARTITIONED; + memMode = ONHEAP_TIERED; + + checkNearEvictionMaxSize(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedAtomicOffHeapNearEvictionMaxSize() throws Exception { + atomicityMode = ATOMIC; + cacheMode = PARTITIONED; + memMode = OFFHEAP_TIERED; checkNearEvictionMaxSize(); } @@ -102,6 +119,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes public void testPartitionedTransactionalNearEvictionMaxSize() throws Exception { atomicityMode = TRANSACTIONAL; cacheMode = PARTITIONED; + memMode = ONHEAP_TIERED; + + checkNearEvictionMaxSize(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedTransactionalOffHeapNearEvictionMaxSize() throws Exception { + atomicityMode = TRANSACTIONAL; + cacheMode = PARTITIONED; + memMode = OFFHEAP_TIERED; checkNearEvictionMaxSize(); } @@ -112,6 +141,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes public void testReplicatedAtomicNearEvictionMaxSize() throws Exception { atomicityMode = ATOMIC; cacheMode = REPLICATED; + memMode = ONHEAP_TIERED; + + checkNearEvictionMaxSize(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedAtomicOffHeapNearEvictionMaxSize() throws Exception { + atomicityMode = ATOMIC; + cacheMode = REPLICATED; + memMode = OFFHEAP_TIERED; checkNearEvictionMaxSize(); } @@ -122,6 +163,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes public void testReplicatedTransactionalNearEvictionMaxSize() throws Exception { atomicityMode = TRANSACTIONAL; cacheMode = REPLICATED; + memMode = ONHEAP_TIERED; + + checkNearEvictionMaxSize(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTransactionalOffHeapNearEvictionMaxSize() throws Exception { + atomicityMode = TRANSACTIONAL; + cacheMode = REPLICATED; + memMode = OFFHEAP_TIERED; checkNearEvictionMaxSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java index 35abf7e..65d9f36 100644 --- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java @@ -33,6 +33,7 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; /** * @@ -65,8 +66,8 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); cfg.setCacheConfiguration(ccfg); } @@ -79,16 +80,31 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ - public void testClassCacheUpdateFailover() throws Exception { + public void testClassCacheUpdateFailover1() throws Exception { + classCacheUpdateFailover(false); + } + + /** + * @throws Exception If failed. + */ + public void testClassCacheUpdateFailover2() throws Exception { + classCacheUpdateFailover(true); + } + + /** + * @param stopSrv If {@code true} restarts server node, otherwise client node. + * @throws Exception If failed. + */ + private void classCacheUpdateFailover(boolean stopSrv) throws Exception { cache = true; startGridsMultiThreaded(2); - cache = false; + cache = stopSrv; IgniteCache<Integer, Object> cache0 = ignite(0).cache(null); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 20; i++) { log.info("Iteration: " + i); Map<Integer, Object> map = new HashMap<>(); @@ -106,7 +122,7 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest } }); - cache0.putAll(map); // Do not stop cache node, so put should not fail. + cache0.putAll(map); fut.get(); } @@ -210,6 +226,26 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest case 9: return new TestClass9(); case 10: return new TestClass10(); + + case 11: return new TestClass11(); + + case 12: return new TestClass12(); + + case 13: return new TestClass13(); + + case 14: return new TestClass14(); + + case 15: return new TestClass15(); + + case 16: return new TestClass16(); + + case 17: return new TestClass17(); + + case 18: return new TestClass18(); + + case 19: return new TestClass19(); + + case 20: return new TestClass20(); } fail(); @@ -221,6 +257,7 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest * */ static class TestClass1 implements Serializable { + /** */ int val; } @@ -268,4 +305,54 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest * */ static class TestClass10 implements Serializable {} + + /** + * + */ + static class TestClass11 implements Serializable {} + + /** + * + */ + static class TestClass12 implements Serializable {} + + /** + * + */ + static class TestClass13 implements Serializable {} + + /** + * + */ + static class TestClass14 implements Serializable {} + + /** + * + */ + static class TestClass15 implements Serializable {} + + /** + * + */ + static class TestClass16 implements Serializable {} + + /** + * + */ + static class TestClass17 implements Serializable {} + + /** + * + */ + static class TestClass18 implements Serializable {} + + /** + * + */ + static class TestClass19 implements Serializable {} + + /** + * + */ + static class TestClass20 implements Serializable {} } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index af2b85c..b64471b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -80,6 +80,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSizeFailoverTest.class); + suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java index 97c558a..f3fac23 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java @@ -46,6 +46,8 @@ public class IgniteCacheFailoverTestSuite2 { suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class); suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class); + suite.addTestSuite(IgniteCacheCrossCacheTxFailoverTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 2efdb82..4926590 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -140,6 +140,11 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class)); suite.addTest(new TestSuite(IgniteCacheServerNodeConcurrentStart.class)); + suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class)); + suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class)); + suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class)); + suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class)); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java ---------------------------------------------------------------------- diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java index 31466b5..eb447b9 100644 --- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java +++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java @@ -46,12 +46,49 @@ public class DbMetadataReader { } /** + * Get specified dialect object for selected database. + * + * @param conn Connection to database. + * @return Specific dialect object. + */ + private DatabaseMetadataDialect dialect(Connection conn) { + try { + String dbProductName = conn.getMetaData().getDatabaseProductName(); + + if ("Oracle".equals(dbProductName)) + return new OracleMetadataDialect(); + else if (dbProductName.startsWith("DB2/")) + return new DB2MetadataDialect(); + else if (dbProductName.equals("MySQL")) + return new MySQLMetadataDialect(); + else + return new JdbcMetadataDialect(); + } + catch (SQLException e) { + log.log(Level.SEVERE, "Failed to resolve dialect (JdbcMetaDataDialect will be used.", e); + + return new JdbcMetadataDialect(); + } + } + /** + * Get list of schemas from database. + * + * @param conn Connection to database. + * @return List of schema names. + * @throws SQLException If schemas loading failed. + */ + public List<String> schemas(Connection conn) throws SQLException { + return dialect(conn).schemas(conn); + } + + /** * Extract DB metadata. * * @param conn Connection. + * @param schemas List of database schemas to process. In case of empty list all schemas will be processed. * @param tblsOnly Tables only flag. */ - public Collection<DbTable> extractMetadata(Connection conn, boolean tblsOnly) throws SQLException { + public Collection<DbTable> extractMetadata(Connection conn, List<String> schemas, boolean tblsOnly) throws SQLException { DatabaseMetadataDialect dialect; try { @@ -70,7 +107,7 @@ public class DbMetadataReader { dialect = new JdbcMetadataDialect(); } - return dialect.tables(conn, tblsOnly); + return dialect.tables(conn, schemas, tblsOnly); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java ---------------------------------------------------------------------- diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java index 17eb8b2..15063e2 100644 --- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java +++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java @@ -25,6 +25,7 @@ import java.util.*; public class DB2MetadataDialect extends JdbcMetadataDialect { /** {@inheritDoc} */ @Override public Set<String> systemSchemas() { - return new HashSet<>(Arrays.asList("SYSIBM", "SYSCAT", "SYSSTAT", "SYSTOOLS")); + return new HashSet<>(Arrays.asList("SYSIBM", "SYSCAT", "SYSSTAT", "SYSTOOLS", "SYSFUN", "SYSIBMADM", + "SYSIBMINTERNAL", "SYSIBMTS", "SYSPROC", "SYSPUBLIC")); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java ---------------------------------------------------------------------- diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java index 0d17567..9c059b8 100644 --- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java +++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java @@ -27,14 +27,25 @@ import java.util.*; */ public abstract class DatabaseMetadataDialect { /** + * Gets schemas from database. + * + * @param conn Database connection. + * @return Collection of schema descriptors. + * @throws SQLException If failed to get schemas. + */ + public abstract List<String> schemas(Connection conn) throws SQLException; + + /** * Gets tables from database. * * @param conn Database connection. + * @param schemas Collention of schema names to load. * @param tblsOnly If {@code true} then gets only tables otherwise gets tables and views. * @return Collection of table descriptors. * @throws SQLException If failed to get tables. */ - public abstract Collection<DbTable> tables(Connection conn, boolean tblsOnly) throws SQLException; + public abstract Collection<DbTable> tables(Connection conn, List<String> schemas, boolean tblsOnly) + throws SQLException; /** * @return Collection of database system schemas. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java ---------------------------------------------------------------------- diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java index ab65e7a..1bb6840 100644 --- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java +++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java @@ -63,76 +63,117 @@ public class JdbcMetadataDialect extends DatabaseMetadataDialect { private static final int IDX_ASC_OR_DESC_IDX = 10; /** {@inheritDoc} */ - @Override public Collection<DbTable> tables(Connection conn, boolean tblsOnly) throws SQLException { + @Override public List<String> schemas(Connection conn) throws SQLException { + List<String> schemas = new ArrayList<>(); + + ResultSet rs = conn.getMetaData().getSchemas(); + + Set<String> sys = systemSchemas(); + + while(rs.next()) { + String schema = rs.getString(1); + + // Skip system schemas. + if (sys.contains(schema)) + continue; + + schemas.add(schema); + } + + return schemas; + } + + /** + * @return If {@code true} use catalogs for table division. + */ + protected boolean useCatalog() { + return false; + } + + /** + * @return If {@code true} use schemas for table division. + */ + protected boolean useSchema() { + return true; + } + + /** {@inheritDoc} */ + @Override public Collection<DbTable> tables(Connection conn, List<String> schemas, boolean tblsOnly) + throws SQLException { DatabaseMetaData dbMeta = conn.getMetaData(); Set<String> sys = systemSchemas(); Collection<DbTable> tbls = new ArrayList<>(); - try (ResultSet tblsRs = dbMeta.getTables(null, null, "%", - tblsOnly ? TABLES_ONLY : TABLES_AND_VIEWS)) { - while (tblsRs.next()) { - String tblCatalog = tblsRs.getString(TBL_CATALOG_IDX); - String tblSchema = tblsRs.getString(TBL_SCHEMA_IDX); - String tblName = tblsRs.getString(TBL_NAME_IDX); + if (schemas.size() == 0) + schemas.add(null); - // In case of MySql we should use catalog. - String schema = tblSchema != null ? tblSchema : tblCatalog; + for (String toSchema: schemas) { + try (ResultSet tblsRs = dbMeta.getTables(useCatalog() ? toSchema : null, useSchema() ? toSchema : null, "%", + tblsOnly ? TABLES_ONLY : TABLES_AND_VIEWS)) { + while (tblsRs.next()) { + String tblCatalog = tblsRs.getString(TBL_CATALOG_IDX); + String tblSchema = tblsRs.getString(TBL_SCHEMA_IDX); + String tblName = tblsRs.getString(TBL_NAME_IDX); - // Skip system schemas. - if (sys.contains(schema)) - continue; + // In case of MySql we should use catalog. + String schema = tblSchema != null ? tblSchema : tblCatalog; - Set<String> pkCols = new HashSet<>(); + // Skip system schemas. + if (sys.contains(schema)) + continue; - try (ResultSet pkRs = dbMeta.getPrimaryKeys(tblCatalog, tblSchema, tblName)) { - while (pkRs.next()) - pkCols.add(pkRs.getString(PK_COL_NAME_IDX)); - } + Set<String> pkCols = new HashSet<>(); + + try (ResultSet pkRs = dbMeta.getPrimaryKeys(tblCatalog, tblSchema, tblName)) { + while (pkRs.next()) + pkCols.add(pkRs.getString(PK_COL_NAME_IDX)); + } - List<DbColumn> cols = new ArrayList<>(); + List<DbColumn> cols = new ArrayList<>(); - try (ResultSet colsRs = dbMeta.getColumns(tblCatalog, tblSchema, tblName, null)) { - while (colsRs.next()) { - String colName = colsRs.getString(COL_NAME_IDX); + try (ResultSet colsRs = dbMeta.getColumns(tblCatalog, tblSchema, tblName, null)) { + while (colsRs.next()) { + String colName = colsRs.getString(COL_NAME_IDX); - cols.add(new DbColumn( - colName, - colsRs.getInt(COL_DATA_TYPE_IDX), - pkCols.contains(colName), - colsRs.getInt(COL_NULLABLE_IDX) == DatabaseMetaData.columnNullable)); + cols.add(new DbColumn( + colName, + colsRs.getInt(COL_DATA_TYPE_IDX), + pkCols.contains(colName), + colsRs.getInt(COL_NULLABLE_IDX) == DatabaseMetaData.columnNullable)); + } } - } - Map<String, Map<String, Boolean>> idxs = new LinkedHashMap<>(); + Map<String, Map<String, Boolean>> idxs = new LinkedHashMap<>(); - try (ResultSet idxRs = dbMeta.getIndexInfo(tblCatalog, tblSchema, tblName, false, true)) { - while (idxRs.next()) { - String idxName = idxRs.getString(IDX_NAME_IDX); + try (ResultSet idxRs = dbMeta.getIndexInfo(tblCatalog, tblSchema, tblName, false, true)) { + while (idxRs.next()) { + String idxName = idxRs.getString(IDX_NAME_IDX); - String colName = idxRs.getString(IDX_COL_NAME_IDX); + String colName = idxRs.getString(IDX_COL_NAME_IDX); - if (idxName == null || colName == null) - continue; + if (idxName == null || colName == null) + continue; - Map<String, Boolean> idx = idxs.get(idxName); + Map<String, Boolean> idx = idxs.get(idxName); - if (idx == null) { - idx = new LinkedHashMap<>(); + if (idx == null) { + idx = new LinkedHashMap<>(); - idxs.put(idxName, idx); - } + idxs.put(idxName, idx); + } - String askOrDesc = idxRs.getString(IDX_ASC_OR_DESC_IDX); + String askOrDesc = idxRs.getString(IDX_ASC_OR_DESC_IDX); - Boolean desc = askOrDesc != null ? "D".equals(askOrDesc) : null; + Boolean desc = askOrDesc != null ? "D".equals(askOrDesc) : null; - idx.put(colName, desc); + idx.put(colName, desc); + } } - } - tbls.add(table(schema, tblName, cols, idxs)); + tbls.add(table(schema, tblName, cols, idxs)); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java ---------------------------------------------------------------------- diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java new file mode 100644 index 0000000..b592321 --- /dev/null +++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.schema.parser.dialect; + +import java.sql.*; +import java.util.*; + +/** + * MySQL specific metadata dialect. + */ +public class MySQLMetadataDialect extends JdbcMetadataDialect { + /** {@inheritDoc} */ + @Override public List<String> schemas(Connection conn) throws SQLException { + List<String> schemas = new ArrayList<>(); + + ResultSet rs = conn.getMetaData().getCatalogs(); + + Set<String> sys = systemSchemas(); + + while(rs.next()) { + String schema = rs.getString(1); + + // Skip system schemas. + if (sys.contains(schema)) + continue; + + schemas.add(schema); + } + + return schemas; + } + + /** {@inheritDoc} */ + @Override protected boolean useCatalog() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean useSchema() { + return false; + } +}
