http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimarySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimarySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimarySelfTest.java new file mode 100644 index 0000000..39c4cd5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsPrimarySelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import static org.apache.ignite.igfs.IgfsMode.*; + +/** + * Tests for PRIMARY mode. + */ +public class IgfsPrimarySelfTest extends IgfsAbstractSelfTest { + /** + * Constructor. + */ + public IgfsPrimarySelfTest() { + super(PRIMARY); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java new file mode 100644 index 0000000..7f3b249 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java @@ -0,0 +1,978 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.commons.io.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.security.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.nio.charset.StandardCharsets.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests for {@link IgfsProcessor}. + */ +public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest { + /** Test IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Meta-information cache name. */ + private static final String META_CACHE_NAME = "replicated"; + + /** Data cache name. */ + public static final String DATA_CACHE_NAME = "data"; + + /** Random numbers generator. */ + protected final SecureRandom rnd = new SecureRandom(); + + /** File system. */ + protected IgniteFs ggfs; + + /** Meta cache. */ + private GridCache<Object, Object> metaCache; + + /** Meta cache name. */ + private String metaCacheName; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + IgniteEx grid = grid(0); + + ggfs = grid.fileSystem(ggfsName()); + + IgfsConfiguration[] cfgs = grid.configuration().getGgfsConfiguration(); + + assert cfgs.length == 1; + + metaCacheName = cfgs[0].getMetaCacheName(); + + metaCache = grid.cachex(metaCacheName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + ggfs.format(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(nodesCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME)); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + IgfsConfiguration ggfsCfg = new IgfsConfiguration(); + + ggfsCfg.setMetaCacheName(META_CACHE_NAME); + ggfsCfg.setDataCacheName(DATA_CACHE_NAME); + ggfsCfg.setName("ggfs"); + + cfg.setGgfsConfiguration(ggfsCfg); + + return cfg; + } + + /** {@inheritDoc} */ + protected CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(cacheName); + + if (META_CACHE_NAME.equals(cacheName)) + cacheCfg.setCacheMode(REPLICATED); + else { + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + + cacheCfg.setBackups(0); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + } + + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setQueryIndexEnabled(false); + + return cacheCfg; + } + + /** @return Test nodes count. */ + public int nodesCount() { + return 1; + } + + /** @return FS name. */ + public String ggfsName() { + return "ggfs"; + } + + /** @throws Exception If failed. */ + public void testGgfsEnabled() throws Exception { + IgniteFs ggfs = grid(0).fileSystem(ggfsName()); + + assertNotNull(ggfs); + } + + /** + * Test properties management in meta-cache. + * + * @throws Exception If failed. + */ + public void testUpdateProperties() throws Exception { + IgfsPath p = path("/tmp/my"); + + ggfs.mkdirs(p); + + Map<String, String> oldProps = ggfs.info(p).properties(); + + ggfs.update(p, F.asMap("a", "1")); + ggfs.update(p, F.asMap("b", "2")); + + assertEquals("1", ggfs.info(p).property("a")); + assertEquals("2", ggfs.info(p).property("b")); + + ggfs.update(p, F.asMap("b", "3")); + + Map<String, String> expProps = new HashMap<>(oldProps); + expProps.put("a", "1"); + expProps.put("b", "3"); + + assertEquals("3", ggfs.info(p).property("b")); + assertEquals(expProps, ggfs.info(p).properties()); + assertEquals("5", ggfs.info(p).property("c", "5")); + + assertUpdatePropertiesFails(null, null, NullPointerException.class, "Ouch! Argument cannot be null"); + assertUpdatePropertiesFails(p, null, NullPointerException.class, "Ouch! Argument cannot be null"); + assertUpdatePropertiesFails(null, F.asMap("x", "9"), NullPointerException.class, + "Ouch! Argument cannot be null"); + + assertUpdatePropertiesFails(p, Collections.<String, String>emptyMap(), IllegalArgumentException.class, + "Ouch! Argument is invalid"); + } + + /** @throws Exception If failed. */ + public void testCreate() throws Exception { + IgfsPath path = path("/file"); + + try (IgfsOutputStream os = ggfs.create(path, false)) { + assert os != null; + + IgfsFileImpl info = (IgfsFileImpl)ggfs.info(path); + + for (int i = 0; i < nodesCount(); i++) { + IgfsFileInfo fileInfo = (IgfsFileInfo)grid(i).cachex(metaCacheName).peek(info.fileId()); + + assertNotNull(fileInfo); + assertNotNull(fileInfo.listing()); + } + } + finally { + ggfs.delete(path("/"), true); + } + } + + /** + * Test make directories. + * + * @throws Exception In case of any exception. + */ + public void testMakeListDeleteDirs() throws Exception { + assertListDir("/"); + + ggfs.mkdirs(path("/ab/cd/ef")); + + assertListDir("/", "ab"); + assertListDir("/ab", "cd"); + assertListDir("/ab/cd", "ef"); + + ggfs.mkdirs(path("/ab/ef")); + ggfs.mkdirs(path("/cd/ef")); + ggfs.mkdirs(path("/cd/gh")); + ggfs.mkdirs(path("/ef")); + ggfs.mkdirs(path("/ef/1")); + ggfs.mkdirs(path("/ef/2")); + ggfs.mkdirs(path("/ef/3")); + + assertListDir("/", "ef", "ab", "cd"); + assertListDir("/ab", "cd", "ef"); + assertListDir("/ab/cd", "ef"); + assertListDir("/ab/cd/ef"); + assertListDir("/cd", "ef", "gh"); + assertListDir("/cd/ef"); + assertListDir("/ef", "1", "2", "3"); + + ggfs.delete(path("/ef/2"), false); + + assertListDir("/", "ef", "ab", "cd"); + assertListDir("/ef", "1", "3"); + + // Delete should return false for non-existing paths. + assertFalse(ggfs.delete(path("/ef/2"), false)); + + assertListDir("/", "ef", "ab", "cd"); + assertListDir("/ef", "1", "3"); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.delete(path("/ef"), false); + + return null; + } + }, IgfsDirectoryNotEmptyException.class, null); + + assertListDir("/", "ef", "ab", "cd"); + assertListDir("/ef", "1", "3"); + + ggfs.delete(path("/ef"), true); + + assertListDir("/", "ab", "cd"); + } + + /** + * Test make directories in multi-threaded environment. + * + * @throws Exception In case of any exception. + */ + @SuppressWarnings("TooBroadScope") + public void testMakeListDeleteDirsMultithreaded() throws Exception { + assertListDir("/"); + + final int max = 2 * 1000; + final int threads = 50; + final AtomicInteger cnt = new AtomicInteger(); + + info("Create directories: " + max); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int cur = cnt.incrementAndGet(); cur < max; cur = cnt.incrementAndGet()) + ggfs.mkdirs(path(cur)); + + return null; + } + }, threads, "grid-test-make-directories"); + + info("Validate directories were created."); + + cnt.set(0); // Reset counter. + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int cur = cnt.incrementAndGet(); cur < max; cur = cnt.incrementAndGet()) { + IgfsFile info = ggfs.info(path(cur)); + + assertNotNull("Expects file exist: " + cur, info); + assertTrue("Expects file is a directory: " + cur, info.isDirectory()); + } + + return null; + } + }, threads, "grid-test-check-directories-exist"); + + info("Validate directories removing."); + + cnt.set(0); // Reset counter. + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int cur = cnt.incrementAndGet(); cur < max; cur = cnt.incrementAndGet()) + ggfs.delete(path(cur), true); + + return null; + } + }, threads, "grid-test-delete-directories"); + } + + /** @throws Exception If failed. */ + public void testBasicOps() throws Exception { + // Create directories. + ggfs.mkdirs(path("/A/B1/C1")); + + for (Object key : metaCache.keySet()) + info("Entry in cache [key=" + key + ", val=" + metaCache.get(key) + ']'); + + ggfs.mkdirs(path("/A/B1/C2")); + ggfs.mkdirs(path("/A/B1/C3")); + ggfs.mkdirs(path("/A/B2/C1")); + ggfs.mkdirs(path("/A/B2/C2")); + + ggfs.mkdirs(path("/A1/B1/C1")); + ggfs.mkdirs(path("/A1/B1/C2")); + ggfs.mkdirs(path("/A1/B1/C3")); + ggfs.mkdirs(path("/A2/B2/C1")); + ggfs.mkdirs(path("/A2/B2/C2")); + + for (Object key : metaCache.keySet()) + info("Entry in cache [key=" + key + ", val=" + metaCache.get(key) + ']'); + + // Check existence. + assert ggfs.exists(path("/A/B1/C1")); + + // List items. + Collection<IgfsPath> paths = ggfs.listPaths(path("/")); + + assert paths.size() == 3 : "Unexpected paths: " + paths; + + paths = ggfs.listPaths(path("/A")); + + assert paths.size() == 2 : "Unexpected paths: " + paths; + + paths = ggfs.listPaths(path("/A/B1")); + + assert paths.size() == 3 : "Unexpected paths: " + paths; + + // Delete. + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.delete(path("/"), false); + + return null; + } + }, IgfsException.class, null); + + ggfs.delete(path("/A1/B1/C1"), false); + assertNull(ggfs.info(path("/A1/B1/C1"))); + + ggfs.delete(path("/A1/B1/C2"), false); + assertNull(ggfs.info(path("/A1/B1/C2"))); + + ggfs.delete(path("/A1/B1/C3"), false); + assertNull(ggfs.info(path("/A1/B1/C3"))); + + assertEquals(Collections.<IgfsPath>emptyList(), ggfs.listPaths(path("/A1/B1"))); + + ggfs.delete(path("/A2/B2"), true); + assertNull(ggfs.info(path("/A2/B2"))); + + assertEquals(Collections.<IgfsPath>emptyList(), ggfs.listPaths(path("/A2"))); + + assertEquals(Arrays.asList(path("/A"), path("/A1"), path("/A2")), sorted(ggfs.listPaths(path("/")))); + + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.delete(path("/"), false); + + return null; + } + }, IgfsException.class, null); + assertEquals(Arrays.asList(path("/A"), path("/A1"), path("/A2")), sorted(ggfs.listPaths(path("/")))); + + ggfs.delete(path("/"), true); + assertEquals(Collections.<IgfsPath>emptyList(), ggfs.listPaths(path("/"))); + + ggfs.delete(path("/"), false); + assertEquals(Collections.<IgfsPath>emptyList(), ggfs.listPaths(path("/"))); + + for (Cache.Entry<Object, Object> e : metaCache) + info("Entry in cache [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + } + + /** + * Ensure correct size calculation. + * + * @throws Exception If failed. + */ + public void testSize() throws Exception { + IgfsPath dir1 = path("/dir1"); + IgfsPath subDir1 = path("/dir1/subdir1"); + IgfsPath dir2 = path("/dir2"); + + IgfsPath fileDir1 = path("/dir1/file"); + IgfsPath fileSubdir1 = path("/dir1/subdir1/file"); + IgfsPath fileDir2 = path("/dir2/file"); + + IgfsOutputStream os = ggfs.create(fileDir1, false); + os.write(new byte[1000]); + os.close(); + + os = ggfs.create(fileSubdir1, false); + os.write(new byte[2000]); + os.close(); + + os = ggfs.create(fileDir2, false); + os.write(new byte[4000]); + os.close(); + + assert ggfs.size(fileDir1) == 1000; + assert ggfs.size(fileSubdir1) == 2000; + assert ggfs.size(fileDir2) == 4000; + + assert ggfs.size(dir1) == 3000; + assert ggfs.size(subDir1) == 2000; + + assert ggfs.size(dir2) == 4000; + } + + /** + * Convert collection into sorted list. + * + * @param col Unsorted collection. + * @return Sorted collection. + */ + private <T extends Comparable<T>> List<T> sorted(Collection<T> col) { + List<T> list = new ArrayList<>(col); + + Collections.sort(list); + + return list; + } + + /** @throws Exception If failed. */ + public void testRename() throws Exception { + // Create directories. + ggfs.mkdirs(path("/A/B1/C1")); + + for (Object key : metaCache.keySet()) + info("Entry in cache [key=" + key + ", val=" + metaCache.get(key) + ']'); + + // Move under itself. + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.rename(path("/A/B1/C1"), path("/A/B1/C1/C2")); + + return null; + } + }, IgfsException.class, null); + + // Move under itself. + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.rename(path("/A/B1/C1"), path("/A/B1/C1/D/C2")); + + return null; + } + }, IgfsException.class, null); + + // Move under itself. + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override + public Object call() throws Exception { + ggfs.rename(path("/A/B1/C1"), path("/A/B1/C1/D/E/C2")); + + return null; + } + }, IgfsException.class, null); + + /// + // F6 > Enter > Tab x N times + // "I like to move it, move it..." + // + + Collection<IgniteBiTuple<String, String>> chain = Arrays.asList( + F.t("/A/B1/C1", "/A/B1/C2"), + F.t("/A/B1", "/A/B2"), + F.t("/A", "/Q"), + //F.t("/Q/B2/C2", "/C3"), + F.t("/Q/B2/C2", "/Q/B2/C1"), + F.t("/Q/B2", "/Q/B1"), + F.t("/Q", "/A"), + //F.t("/C3", "/A/B1/C1") + F.t("/A/B1/C1", "/"), + F.t("/C1", "/A/B1") + ); + + final IgfsPath root = path("/"); + + for (IgniteBiTuple<String, String> e : chain) { + final IgfsPath p1 = path(e.get1()); + final IgfsPath p2 = path(e.get2()); + + assertTrue("Entry: " + e, ggfs.exists(p1)); + ggfs.rename(p1, p2); + assertFalse("Entry: " + e, ggfs.exists(p1)); + assertTrue("Entry: " + e, ggfs.exists(p2)); + + // Test root rename. + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.rename(root, p1); + + return null; + } + }, IgfsException.class, null); + + // Test root rename. + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.rename(p1, root); + + return null; + } + }, IgfsException.class, null); + + // Test root rename. + if (!root.equals(p2)) { + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.rename(root, p2); + + return null; + } + }, IgfsException.class, null); + } + + // Test same rename. + ggfs.rename(p1, p1); + ggfs.rename(p2, p2); + } + + // List items. + assertEquals(Arrays.asList(path("/A")), sorted(ggfs.listPaths(root))); + assertEquals(Arrays.asList(path("/A/B1")), sorted(ggfs.listPaths(path("/A")))); + assertEquals(Arrays.asList(path("/A/B1/C1")), sorted(ggfs.listPaths(path("/A/B1")))); + + String text = "Test long number: " + rnd.nextLong(); + + // Create file. + assertEquals(text, create("/A/a", false, text)); + + // Validate renamed during reading. + + try (IgfsInputStream in0 = ggfs.open(path("/A/a"))) { + // Rename file. + ggfs.rename(path("/A/a"), path("/b")); + + assertEquals(text, IOUtils.toString(in0, UTF_8)); + } + + // Validate after renamed. + assertOpenFails("/A/a", "File not found"); + assertEquals(text, read("/b")); + + // Cleanup. + ggfs.delete(root, true); + + assertEquals(Collections.<IgfsPath>emptyList(), ggfs.listPaths(root)); + } + + /** + * @param path Path. + * @return GGFS path. + */ + private IgfsPath path(String path) { + assert path != null; + + return new IgfsPath(path); + } + + /** + * @param i Path index. + * @return GGFS path. + */ + private IgfsPath path(long i) { + //return path(String.format("/%d", i)); + return path(String.format("/%d/q/%d/%d", i % 10, (i / 10) % 10, i)); + } + + /** @throws Exception If failed. */ + public void testCreateOpenAppend() throws Exception { + // Error - path points to root directory. + assertCreateFails("/", false, "Failed to resolve parent directory"); + + // Create directories. + ggfs.mkdirs(path("/A/B1/C1")); + + // Error - path points to directory. + for (String path : Arrays.asList("/A", "/A/B1", "/A/B1/C1")) { + assertCreateFails(path, false, "Failed to create file (file already exists)"); + assertCreateFails(path, true, "Failed to create file (path points to a directory)"); + assertAppendFails(path, false, "Failed to open file (not a file)"); + assertAppendFails(path, true, "Failed to open file (not a file)"); + assertOpenFails(path, "Failed to open file (not a file)"); + } + + String text1 = "Test long number #1: " + rnd.nextLong(); + String text2 = "Test long number #2: " + rnd.nextLong(); + + // Error - parent does not exist. + for (String path : Arrays.asList("/A/a", "/A/B1/a", "/A/B1/C1/a")) { + // Error - file doesn't exist. + assertOpenFails(path, "File not found"); + assertAppendFails(path, false, "File not found"); + + // Create new and write. + assertEquals(text1, create(path, false, text1)); + + // Error - file already exists. + assertCreateFails(path, false, "Failed to create file (file already exists)"); + + // Overwrite existent. + assertEquals(text2, create(path, true, text2)); + + // Append text. + assertEquals(text2 + text1, append(path, false, text1)); + + // Append text. + assertEquals(text2 + text1 + text2, append(path, true, text2)); + + // Delete this file. + ggfs.delete(path(path), true); + + // Error - file doesn't exist. + assertOpenFails(path, "File not found"); + assertAppendFails(path, false, "File not found"); + + // Create with append. + assertEquals(text1, append(path, true, text1)); + + // Append. + for (String full = text1, cur = ""; full.length() < 10000; cur = ", long=" + rnd.nextLong()) + assertEquals(full += cur, append(path, rnd.nextBoolean(), cur)); + + ggfs.delete(path(path), false); + } + } + + /** @throws Exception If failed. */ + @SuppressWarnings("BusyWait") + public void testDeleteCacheConsistency() throws Exception { + IgfsPath path = new IgfsPath("/someFile"); + + try (IgfsOutputStream out = ggfs.create(path, true)) { + out.write(new byte[10 * 1024 * 1024]); + } + + IgniteUuid fileId = U.field(ggfs.info(path), "fileId"); + + GridCache<IgniteUuid, IgfsFileInfo> metaCache = grid(0).cachex(META_CACHE_NAME); + GridCache<IgfsBlockKey, byte[]> dataCache = grid(0).cachex(DATA_CACHE_NAME); + + IgfsFileInfo info = metaCache.get(fileId); + + assertNotNull(info); + assertTrue(info.isFile()); + assertNotNull(metaCache.get(info.id())); + + IgfsDataManager dataMgr = ((IgfsEx)ggfs).context().data(); + + for (int i = 0; i < info.blocksCount(); i++) + assertNotNull(dataCache.get(dataMgr.blockKey(i, info))); + + ggfs.delete(path, true); + + for (int i = 0; i < 25; i++) { + if (metaCache.get(info.id()) == null) + break; + + U.sleep(100); + } + + assertNull(metaCache.get(info.id())); + + for (int i = 0; i < 10; i++) { + boolean doBreak = true; + + for (int j = 0; j < info.blocksCount(); j++) { + if (dataCache.get(dataMgr.blockKey(i, info)) != null) { + doBreak = false; + + break; + } + } + + if (doBreak) + break; + else + Thread.sleep(100); + } + + for (int i = 0; i < info.blocksCount(); i++) + assertNull(dataCache.get(new IgfsBlockKey(info.id(), null, false, i))); + } + + /** @throws Exception If failed. */ + public void testCreateAppendLongData1() throws Exception { + checkCreateAppendLongData(123, 1024, 100); + } + + /** @throws Exception If failed. */ + public void testCreateAppendLongData2() throws Exception { + checkCreateAppendLongData(123 + 1024, 1024, 100); + } + + /** @throws Exception If failed. */ + public void testCreateAppendLongData3() throws Exception { + checkCreateAppendLongData(123, 1024, 1000); + } + + /** @throws Exception If failed. */ + public void testCreateAppendLongData4() throws Exception { + checkCreateAppendLongData(123 + 1024, 1024, 1000); + } + + /** + * Test format operation on non-empty file system. + * + * @throws Exception If failed. + */ + public void testFormatNonEmpty() throws Exception { + String dirPath = "/A/B/C"; + + ggfs.mkdirs(path(dirPath)); + + String filePath = "/someFile"; + + create(filePath, false, "Some text."); + + ggfs.format(); + + assert !ggfs.exists(path(dirPath)); + assert !ggfs.exists(path(filePath)); + + assert grid(0).cachex(ggfs.configuration().getMetaCacheName()).size() == 2; // ROOT + TRASH. + } + + /** + * Test format operation on empty file system. + * + * @throws Exception If failed. + */ + public void testFormatEmpty() throws Exception { + ggfs.format(); + } + + /** + * @param chunkSize Chunk size. + * @param bufSize Buffer size. + * @param cnt Count. + * @throws Exception If failed. + */ + private void checkCreateAppendLongData(int chunkSize, int bufSize, int cnt) throws Exception { + IgfsPath path = new IgfsPath("/someFile"); + + byte[] buf = new byte[chunkSize]; + + for (int i = 0; i < buf.length; i++) + buf[i] = (byte)(i * i); + + IgfsOutputStream os = ggfs.create(path, bufSize, true, null, 0, 1024, null); + + try { + for (int i = 0; i < cnt; i++) + os.write(buf); + + os.flush(); + } + finally { + os.close(); + } + + os = ggfs.append(path, chunkSize, false, null); + + try { + for (int i = 0; i < cnt; i++) + os.write(buf); + + os.flush(); + } + finally { + os.close(); + } + + byte[] readBuf = new byte[chunkSize]; + + try (IgfsInputStream in = ggfs.open(path)) { + long pos = 0; + + for (int k = 0; k < 2 * cnt; k++) { + in.readFully(pos, readBuf); + + for (int i = 0; i < readBuf.length; i++) + assertEquals(buf[i], readBuf[i]); + + pos += readBuf.length; + } + } + } + + /** + * Create file and write specified text to. + * + * @param path File path to create. + * @param overwrite Overwrite file if it already exists. + * @param text Text to write into file. + * @return Content of this file. + * @throws IgniteCheckedException In case of error. + */ + private String create(String path, boolean overwrite, String text) throws Exception { + + try (IgfsOutputStream out = ggfs.create(path(path), overwrite)) { + IOUtils.write(text, out, UTF_8); + } + + assertNotNull(ggfs.info(path(path))); + + return read(path); + } + + /** + * Appent text to the file. + * + * @param path File path to create. + * @param create Create file if it doesn't exist yet. + * @param text Text to append to file. + * @return Content of this file. + * @throws IgniteCheckedException In case of error. + */ + private String append(String path, boolean create, String text) throws Exception { + + try (IgfsOutputStream out = ggfs.append(path(path), create)) { + IOUtils.write(text, out, UTF_8); + } + + assertNotNull(ggfs.info(path(path))); + + return read(path); + } + + /** + * Read content of the file. + * + * @param path File path to read. + * @return Content of this file. + * @throws IgniteCheckedException In case of error. + */ + private String read(String path) throws Exception { + + try (IgfsInputStream in = ggfs.open(path(path))) { + return IOUtils.toString(in, UTF_8); + } + } + + /** + * Test expected failures for 'update properties' operation. + * + * @param path Path to the file. + * @param props File properties to set. + * @param msg Failure message if expected exception was not thrown. + */ + private void assertUpdatePropertiesFails(@Nullable final IgfsPath path, + @Nullable final Map<String, String> props, + Class<? extends Throwable> cls, @Nullable String msg) { + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return ggfs.update(path, props); + } + }, cls, msg); + } + + /** + * Test expected failures for 'create' operation. + * + * @param path File path to create. + * @param overwrite Overwrite file if it already exists. Note: you cannot overwrite an existent directory. + * @param msg Failure message if expected exception was not thrown. + */ + private void assertCreateFails(final String path, final boolean overwrite, @Nullable String msg) { + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.create(path(path), overwrite); + + return false; + } + }, IgfsException.class, msg); + } + + /** + * Test expected failures for 'append' operation. + * + * @param path File path to append. + * @param create Create file if it doesn't exist yet. + * @param msg Failure message if expected exception was not thrown. + */ + private void assertAppendFails(final String path, final boolean create, @Nullable String msg) { + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.append(path(path), create); + + return false; + } + }, IgfsException.class, msg); + } + + /** + * Test expected failures for 'open' operation. + * + * @param path File path to read. + * @param msg Failure message if expected exception was not thrown. + */ + private void assertOpenFails(final String path, @Nullable String msg) { + GridTestUtils.assertThrowsInherited(log, new Callable<Object>() { + @Override public Object call() throws Exception { + ggfs.open(path(path)); + + return false; + } + }, IgniteException.class, msg); + } + + /** + * Validate directory listing. + * + * @param path Directory path to validate listing for. + * @param item List of directory items. + * @throws IgniteCheckedException If failed. + */ + private void assertListDir(String path, String... item) throws IgniteCheckedException { + Collection<IgfsFile> files = ggfs.listFiles(new IgfsPath(path)); + + List<String> names = new ArrayList<>(item.length); + + for (IgfsFile file : files) + names.add(file.path().name()); + + Arrays.sort(item); + Collections.sort(names); + + assertEquals(Arrays.asList(item), names); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java new file mode 100644 index 0000000..7cd254d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +import java.lang.reflect.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.igfs.IgfsMode.*; + +/** + * Tests for node validation logic in {@link IgfsProcessor}. + * <p> + * Tests starting with "testLocal" are checking + * {@link IgfsProcessor#validateLocalGgfsConfigurations(org.apache.ignite.configuration.IgfsConfiguration[])}. + * <p> + * Tests starting with "testRemote" are checking {@link IgfsProcessor#checkGgfsOnRemoteNode(org.apache.ignite.cluster.ClusterNode)}. + */ +public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid #1 config. */ + private IgniteConfiguration g1Cfg; + + /** Data cache 1 name. */ + private static final String dataCache1Name = "dataCache1"; + + /** Data cache 2 name. */ + private static final String dataCache2Name = "dataCache2"; + + /** Meta cache 1 name. */ + private static final String metaCache1Name = "metaCache1"; + + /** Meta cache 2 name. */ + private static final String metaCache2Name = "metaCache2"; + + /** First GGFS config in grid #1. */ + private IgfsConfiguration g1GgfsCfg1 = new IgfsConfiguration(); + + /** Second GGFS config in grid#1. */ + private IgfsConfiguration g1GgfsCfg2 = new IgfsConfiguration(); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + g1Cfg = getConfiguration("g1"); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + g1GgfsCfg1.setName("g1GgfsCfg1"); + g1GgfsCfg1.setDataCacheName(dataCache1Name); + g1GgfsCfg1.setMetaCacheName(metaCache1Name); + + g1GgfsCfg2.setName("g1GgfsCfg2"); + g1GgfsCfg2.setDataCacheName(dataCache2Name); + g1GgfsCfg2.setMetaCacheName(metaCache2Name); + + cfg.setGgfsConfiguration(g1GgfsCfg1, g1GgfsCfg2); + + cfg.setLocalHost("127.0.0.1"); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Returns a new array that contains the concatenated contents of two arrays. + * + * @param first the first array of elements to concatenate. + * @param second the second array of elements to concatenate. + * @param cls + * @return Concatenated array. + */ + private <T> T[] concat(T[] first, T[] second, Class<?> cls) { + Collection<T> res = new ArrayList<>(); + + res.addAll(Arrays.asList(first)); + res.addAll(Arrays.asList(second)); + + return res.toArray((T[]) Array.newInstance(cls, res.size())); + } + + + /** + * @throws Exception If failed. + */ + public void testLocalIfNoCacheIsConfigured() throws Exception { + checkGridStartFails(g1Cfg, "Data cache is not configured locally for GGFS", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfNoDataCacheIsConfigured() throws Exception { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setQueryIndexEnabled(false); + cc.setName("someName"); + + g1Cfg.setCacheConfiguration(cc); + + checkGridStartFails(g1Cfg, "Data cache is not configured locally for GGFS", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfNoMetadataCacheIsConfigured() throws Exception { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setQueryIndexEnabled(false); + cc.setName(dataCache1Name); + + g1Cfg.setCacheConfiguration(cc); + + checkGridStartFails(g1Cfg, "Metadata cache is not configured locally for GGFS", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfAffinityMapperIsWrongClass() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + for (CacheConfiguration cc : g1Cfg.getCacheConfiguration()) + cc.setAffinityMapper(new GridCacheDefaultAffinityKeyMapper()); + + checkGridStartFails(g1Cfg, "Invalid GGFS data cache configuration (key affinity mapper class should be", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfGgfsConfigsHaveDifferentNames() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + String ggfsCfgName = "ggfs-cfg"; + + g1GgfsCfg1.setName(ggfsCfgName); + g1GgfsCfg2.setName(ggfsCfgName); + + checkGridStartFails(g1Cfg, "Duplicate GGFS name found (check configuration and assign unique name", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfQueryIndexingEnabledForDataCache() throws Exception { + CacheConfiguration[] dataCaches = dataCaches(1024); + + dataCaches[0].setQueryIndexEnabled(true); + + g1Cfg.setCacheConfiguration(concat(dataCaches, metaCaches(), CacheConfiguration.class)); + + checkGridStartFails(g1Cfg, "GGFS data cache cannot start with enabled query indexing", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfQueryIndexingEnabledForMetaCache() throws Exception { + CacheConfiguration[] metaCaches = metaCaches(); + + metaCaches[0].setQueryIndexEnabled(true); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches, CacheConfiguration.class)); + + checkGridStartFails(g1Cfg, "GGFS metadata cache cannot start with enabled query indexing", true); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("NullableProblems") + public void testLocalNullGgfsNameIsSupported() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + g1GgfsCfg1.setName(null); + + assertFalse(G.start(g1Cfg).cluster().nodes().isEmpty()); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfOffheapIsDisabledAndMaxSpaceSizeIsGreater() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + g1GgfsCfg2.setMaxSpaceSize(999999999999999999L); + + checkGridStartFails(g1Cfg, "Maximum GGFS space size cannot be greater that size of available heap", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfOffheapIsEnabledAndMaxSpaceSizeIsGreater() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + for (CacheConfiguration cc : g1Cfg.getCacheConfiguration()) + cc.setOffHeapMaxMemory(1000000); + + g1GgfsCfg2.setMaxSpaceSize(999999999999999999L); + + checkGridStartFails(g1Cfg, + "Maximum GGFS space size cannot be greater than size of available heap memory and offheap storage", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfBackupsEnabled() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + for (CacheConfiguration cc : g1Cfg.getCacheConfiguration()) { + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + } + + checkGridStartFails(g1Cfg, "GGFS data cache cannot be used with backups", true); + } + + /** + * @throws Exception If failed. + */ + public void testLocalIfNonPrimaryModeAndHadoopFileSystemUriIsNull() throws Exception { + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + g1GgfsCfg2.setDefaultMode(PROXY); + + checkGridStartFails(g1Cfg, "secondaryFileSystem cannot be null when mode is SECONDARY", true); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfDataBlockSizeDiffers() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + + g2GgfsCfg1.setBlockSize(g2GgfsCfg1.getBlockSize() + 100); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g1GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Data block size should be the same on all nodes in grid for GGFS", false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfAffinityMapperGroupSizeDiffers() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(4021), metaCaches(), CacheConfiguration.class)); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Affinity mapper group size should be the same on all nodes in grid for GGFS", + false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfMetaCacheNameDiffers() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + IgfsConfiguration g2GgfsCfg2 = new IgfsConfiguration(g1GgfsCfg2); + + g2GgfsCfg1.setMetaCacheName("g2MetaCache1"); + g2GgfsCfg2.setMetaCacheName("g2MetaCache2"); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches("g2MetaCache1", "g2MetaCache2"), + CacheConfiguration.class)); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g2GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Meta cache name should be the same on all nodes in grid for GGFS", false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfMetaCacheNameEquals() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + IgfsConfiguration g2GgfsCfg2 = new IgfsConfiguration(g1GgfsCfg2); + + g2GgfsCfg1.setName("g2GgfsCfg1"); + g2GgfsCfg2.setName("g2GgfsCfg2"); + + g2GgfsCfg1.setDataCacheName("g2DataCache1"); + g2GgfsCfg2.setDataCacheName("g2DataCache2"); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024, "g2DataCache1", "g2DataCache2"), metaCaches(), + CacheConfiguration.class)); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g2GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Meta cache names should be different for different GGFS instances", false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfDataCacheNameDiffers() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + IgfsConfiguration g2GgfsCfg2 = new IgfsConfiguration(g1GgfsCfg2); + + g2GgfsCfg1.setDataCacheName("g2DataCache1"); + g2GgfsCfg2.setDataCacheName("g2DataCache2"); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024, "g2DataCache1", "g2DataCache2"), metaCaches(), + CacheConfiguration.class)); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g2GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Data cache name should be the same on all nodes in grid for GGFS", false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfDataCacheNameEquals() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + IgfsConfiguration g2GgfsCfg2 = new IgfsConfiguration(g1GgfsCfg2); + + g2GgfsCfg1.setName("g2GgfsCfg1"); + g2GgfsCfg2.setName("g2GgfsCfg2"); + + g2GgfsCfg1.setMetaCacheName("g2MetaCache1"); + g2GgfsCfg2.setMetaCacheName("g2MetaCache2"); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches("g2MetaCache1", "g2MetaCache2"), + CacheConfiguration.class)); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g2GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Data cache names should be different for different GGFS instances", false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfDefaultModeDiffers() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + IgfsConfiguration g2GgfsCfg2 = new IgfsConfiguration(g1GgfsCfg2); + + g1GgfsCfg1.setDefaultMode(DUAL_ASYNC); + g1GgfsCfg2.setDefaultMode(DUAL_ASYNC); + + g2GgfsCfg1.setDefaultMode(DUAL_SYNC); + g2GgfsCfg2.setDefaultMode(DUAL_SYNC); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g2GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Default mode should be the same on all nodes in grid for GGFS", false); + } + + /** + * @throws Exception If failed. + */ + public void testRemoteIfPathModeDiffers() throws Exception { + IgniteConfiguration g2Cfg = getConfiguration("g2"); + + IgfsConfiguration g2GgfsCfg1 = new IgfsConfiguration(g1GgfsCfg1); + IgfsConfiguration g2GgfsCfg2 = new IgfsConfiguration(g1GgfsCfg2); + + g2GgfsCfg1.setPathModes(Collections.singletonMap("/somePath", DUAL_SYNC)); + g2GgfsCfg2.setPathModes(Collections.singletonMap("/somePath", DUAL_SYNC)); + + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + g2Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + g2Cfg.setGgfsConfiguration(g2GgfsCfg1, g2GgfsCfg2); + + G.start(g1Cfg); + + checkGridStartFails(g2Cfg, "Path modes should be the same on all nodes in grid for GGFS", false); + } + + /** + * Checks that the given grid configuration will lead to {@link IgniteCheckedException} upon grid startup. + * + * @param cfg Grid configuration to check. + * @param excMsgSnippet Root cause (assertion) exception message snippet. + * @param testLoc {@code True} if checking is done for "testLocal" tests. + */ + private void checkGridStartFails(IgniteConfiguration cfg, CharSequence excMsgSnippet, boolean testLoc) { + assertNotNull(cfg); + assertNotNull(excMsgSnippet); + + try { + G.start(cfg); + + fail("No exception has been thrown."); + } + catch (IgniteException e) { + if (testLoc) { + if ("Failed to start processor: GridProcessorAdapter []".equals(e.getMessage()) && + (e.getCause().getMessage().contains(excMsgSnippet) || + e.getCause().getCause().getMessage().contains(excMsgSnippet))) + return; // Expected exception. + } + else if (e.getMessage().contains(excMsgSnippet)) + return; // Expected exception. + + error("Caught unexpected exception.", e); + + fail(); + } + } + + /** + * @param grpSize Group size to use in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}. + * @param cacheNames 2 Optional caches names. + * @return 2 preconfigured data caches. + */ + private CacheConfiguration[] dataCaches(int grpSize, String... cacheNames) { + assertTrue(F.isEmpty(cacheNames) || cacheNames.length == 2); + + if (F.isEmpty(cacheNames)) + cacheNames = new String[] {dataCache1Name, dataCache2Name}; + + CacheConfiguration[] res = new CacheConfiguration[cacheNames.length]; + + for (int i = 0; i < cacheNames.length; i++) { + CacheConfiguration dataCache = defaultCacheConfiguration(); + + dataCache.setName(cacheNames[i]); + dataCache.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(grpSize)); + dataCache.setAtomicityMode(TRANSACTIONAL); + dataCache.setQueryIndexEnabled(false); + + res[i] = dataCache; + } + + return res; + } + + /** + * @param cacheNames 2 Optional caches names. + * @return 2 preconfigured meta caches. + */ + private CacheConfiguration[] metaCaches(String... cacheNames) { + assertTrue(F.isEmpty(cacheNames) || cacheNames.length == 2); + + if (F.isEmpty(cacheNames)) + cacheNames = new String[] {metaCache1Name, metaCache2Name}; + + CacheConfiguration[] res = new CacheConfiguration[cacheNames.length]; + + for (int i = 0; i < cacheNames.length; i++) { + CacheConfiguration metaCache = defaultCacheConfiguration(); + + metaCache.setName(cacheNames[i]); + metaCache.setAtomicityMode(TRANSACTIONAL); + metaCache.setQueryIndexEnabled(false); + + res[i] = metaCache; + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java new file mode 100644 index 0000000..e35cced --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.port.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.configuration.IgfsConfiguration.*; + +/** + * Base test class for {@link IgfsServer} checking IPC endpoint registrations. + */ +public abstract class IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest extends IgfsCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + private static final AtomicInteger mgmtPort = new AtomicInteger(DFLT_MGMT_PORT); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testLoopbackEndpointsRegistration() throws Exception { + IgniteConfiguration cfg = gridConfiguration(); + + cfg.setGgfsConfiguration( + igniteFsConfiguration("tcp", DFLT_IPC_PORT, null) + ); + + G.start(cfg); + + T2<Integer, Integer> res = checkRegisteredIpcEndpoints(); + + // One regular enpoint + one management endpoint. + assertEquals(2, res.get1().intValue()); + assertEquals(0, res.get2().intValue()); + } + + /** + * @throws Exception If failed. + */ + public void testLoopbackEndpointsCustomHostRegistration() throws Exception { + IgniteConfiguration cfg = gridConfiguration(); + + cfg.setGgfsConfiguration( + igniteFsConfiguration("tcp", DFLT_IPC_PORT, "127.0.0.1"), + igniteFsConfiguration("tcp", DFLT_IPC_PORT + 1, U.getLocalHost().getHostName())); + + G.start(cfg); + + T2<Integer, Integer> res = checkRegisteredIpcEndpoints(); + + // Two regular endpoints + two management endpoints. + assertEquals(4, res.get1().intValue()); + assertEquals(0, res.get2().intValue()); + } + + /** + * Counts all registered IPC endpoints. + * + * @return Tuple2 where (tcp endpoints count, shmem endpoints count). + */ + protected T2<Integer, Integer> checkRegisteredIpcEndpoints() throws Exception { + GridKernalContext ctx = ((IgniteKernal)grid()).context(); + + int tcp = 0; + int shmem = 0; + + for (GridPortRecord record : ctx.ports().records()) { + if (record.clazz() == IpcSharedMemoryServerEndpoint.class) + shmem++; + else if (record.clazz() == IpcServerTcpEndpoint.class) + tcp++; + } + + return new T2<>(tcp, shmem); + } + + /** + * Creates base grid configuration. + * + * @return Base grid configuration. + * @throws Exception In case of any error. + */ + protected IgniteConfiguration gridConfiguration() throws Exception { + IgniteConfiguration cfg = getConfiguration(getTestGridName()); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setName("partitioned"); + cc.setCacheMode(CacheMode.PARTITIONED); + cc.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); + cc.setBackups(0); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setQueryIndexEnabled(false); + + CacheConfiguration metaCfg = defaultCacheConfiguration(); + + metaCfg.setName("replicated"); + metaCfg.setCacheMode(CacheMode.REPLICATED); + metaCfg.setAtomicityMode(TRANSACTIONAL); + metaCfg.setQueryIndexEnabled(false); + + cfg.setCacheConfiguration(metaCfg, cc); + + return cfg; + } + + /** + * Creates test-purposed IgniteFsConfiguration. + * + * @param endPntType End point type. + * @param endPntPort End point port. + * @param endPntHost End point host. + * @return test-purposed IgniteFsConfiguration. + */ + protected IgfsConfiguration igniteFsConfiguration(@Nullable String endPntType, @Nullable Integer endPntPort, + @Nullable String endPntHost) throws IgniteCheckedException { + HashMap<String, String> endPntCfg = null; + + if (endPntType != null) { + endPntCfg = new HashMap<>(); + + endPntCfg.put("type", endPntType); + + if (endPntPort != null) + endPntCfg.put("port", String.valueOf(endPntPort)); + + if (endPntHost != null) + endPntCfg.put("host", endPntHost); + } + + IgfsConfiguration ggfsConfiguration = new IgfsConfiguration(); + + ggfsConfiguration.setDataCacheName("partitioned"); + ggfsConfiguration.setMetaCacheName("replicated"); + ggfsConfiguration.setName("ggfs" + UUID.randomUUID()); + ggfsConfiguration.setManagementPort(mgmtPort.getAndIncrement()); + + if (endPntCfg != null) + ggfsConfiguration.setIpcEndpointConfiguration(endPntCfg); + + return ggfsConfiguration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java new file mode 100644 index 0000000..6def2d3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; + +import static org.apache.ignite.configuration.IgfsConfiguration.*; + +/** + * Tests for {@link IgfsServer} that checks all IPC endpoint registration types + * permitted for Linux and Mac OS. + */ +public class IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest + extends IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest { + /** + * @throws Exception If failed. + */ + public void testLoopbackAndShmemEndpointsRegistration() throws Exception { + IgniteConfiguration cfg = gridConfiguration(); + + cfg.setGgfsConfiguration( + igniteFsConfiguration(null, null, null), // Check null IPC endpoint config won't bring any hassles. + igniteFsConfiguration("tcp", DFLT_IPC_PORT + 1, null), + igniteFsConfiguration("shmem", DFLT_IPC_PORT + 2, null)); + + G.start(cfg); + + T2<Integer, Integer> res = checkRegisteredIpcEndpoints(); + + // 1 regular + 3 management TCP endpoins. + assertEquals(4, res.get1().intValue()); + assertEquals(2, res.get2().intValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java new file mode 100644 index 0000000..df9916f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + +/** + * Tests for {@link IgfsServerManager} that checks shmem IPC endpoint registration + * forbidden for Windows. + */ +public class IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest + extends IgfsServerManagerIpcEndpointRegistrationAbstractSelfTest { + /** + * @throws Exception If failed. + */ + public void testShmemEndpointsRegistration() throws Exception { + Throwable e = GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteConfiguration cfg = gridConfiguration(); + + cfg.setGgfsConfiguration(igniteFsConfiguration("shmem", IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT, + null)); + + return G.start(cfg); + } + }, IgniteException.class, null); + + assert e.getCause().getCause().getMessage().contains(" should not be configured on Windows (configure " + + IpcServerTcpEndpoint.class.getSimpleName() + ")"); + } +}
