http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java deleted file mode 100644 index bb155b4..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; -import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; -import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; -import org.apache.ignite.internal.processors.igfs.IgfsImpl; -import org.apache.ignite.internal.processors.igfs.IgfsMetaManager; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; - -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.Callable; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; -import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH; -import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.awaitFileClose; -import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.clear; -import static org.apache.ignite.internal.processors.igfs.IgfsAbstractSelfTest.create; - -/** - * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC. - */ -public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractTest { - /** IGFS block size. */ - protected static final int IGFS_BLOCK_SIZE = 512 * 1024; - - /** Amount of blocks to prefetch. */ - protected static final int PREFETCH_BLOCKS = 1; - - /** Amount of sequential block reads before prefetch is triggered. */ - protected static final int SEQ_READS_BEFORE_PREFETCH = 2; - - /** Secondary file system URI. */ - protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; - - /** Secondary file system configuration path. */ - protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; - - /** Primary file system URI. */ - protected static final String PRIMARY_URI = "igfs://igfs:grid@/"; - - /** Primary file system configuration path. */ - protected static final String PRIMARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback.xml"; - - /** Primary file system REST endpoint configuration map. */ - protected static final IgfsIpcEndpointConfiguration PRIMARY_REST_CFG; - - /** Secondary file system REST endpoint configuration map. */ - protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; - - /** Directory. */ - protected static final IgfsPath DIR = new IgfsPath("/dir"); - - /** Sub-directory. */ - protected static final IgfsPath SUBDIR = new IgfsPath(DIR, "subdir"); - - /** File. */ - protected static final IgfsPath FILE = new IgfsPath(SUBDIR, "file"); - - /** Default data chunk (128 bytes). */ - protected static byte[] chunk; - - /** Primary IGFS. */ - protected static IgfsImpl igfs; - - /** Secondary IGFS. */ - protected static IgfsImpl igfsSecondary; - - /** IGFS mode. */ - protected final IgfsMode mode; - - static { - PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration(); - - PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); - PRIMARY_REST_CFG.setPort(10500); - - SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); - - SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); - SECONDARY_REST_CFG.setPort(11500); - } - - /** - * Constructor. - * - * @param mode IGFS mode. - */ - protected HadoopIgfsDualAbstractSelfTest(IgfsMode mode) { - this.mode = mode; - assert mode == DUAL_SYNC || mode == DUAL_ASYNC; - } - - /** - * Start grid with IGFS. - * - * @param gridName Grid name. - * @param igfsName IGFS name - * @param mode IGFS mode. - * @param secondaryFs Secondary file system (optional). - * @param restCfg Rest configuration string (optional). - * @return Started grid instance. - * @throws Exception If failed. - */ - protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, - @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("dataCache"); - igfsCfg.setMetaCacheName("metaCache"); - igfsCfg.setName(igfsName); - igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); - igfsCfg.setDefaultMode(mode); - igfsCfg.setIpcEndpointConfiguration(restCfg); - igfsCfg.setSecondaryFileSystem(secondaryFs); - igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); - igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setNearConfiguration(null); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setOffHeapMaxMemory(0); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - return G.start(cfg); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - chunk = new byte[128]; - - for (int i = 0; i < chunk.length; i++) - chunk[i] = (byte)i; - - Ignite igniteSecondary = startGridWithIgfs("grid-secondary", "igfs-secondary", PRIMARY, null, SECONDARY_REST_CFG); - - IgfsSecondaryFileSystem hadoopFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); - - Ignite ignite = startGridWithIgfs("grid", "igfs", mode, hadoopFs, PRIMARY_REST_CFG); - - igfsSecondary = (IgfsImpl) igniteSecondary.fileSystem("igfs-secondary"); - igfs = (IgfsImpl) ignite.fileSystem("igfs"); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - clear(igfs); - clear(igfsSecondary); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - G.stopAll(true); - } - - /** - * Convenient method to group paths. - * - * @param paths Paths to group. - * @return Paths as array. - */ - protected IgfsPath[] paths(IgfsPath... paths) { - return paths; - } - - /** - * Check how prefetch override works. - * - * @throws Exception IF failed. - */ - public void testOpenPrefetchOverride() throws Exception { - create(igfsSecondary, paths(DIR, SUBDIR), paths(FILE)); - - // Write enough data to the secondary file system. - final int blockSize = IGFS_BLOCK_SIZE; - - IgfsOutputStream out = igfsSecondary.append(FILE, false); - - int totalWritten = 0; - - while (totalWritten < blockSize * 2 + chunk.length) { - out.write(chunk); - - totalWritten += chunk.length; - } - - out.close(); - - awaitFileClose(igfsSecondary, FILE); - - // Instantiate file system with overridden "seq reads before prefetch" property. - Configuration cfg = new Configuration(); - - cfg.addResource(U.resolveIgniteUrl(PRIMARY_CFG)); - - int seqReads = SEQ_READS_BEFORE_PREFETCH + 1; - - cfg.setInt(String.format(PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, "igfs:grid@"), seqReads); - - FileSystem fs = FileSystem.get(new URI(PRIMARY_URI), cfg); - - // Read the first two blocks. - Path fsHome = new Path(PRIMARY_URI); - Path dir = new Path(fsHome, DIR.name()); - Path subdir = new Path(dir, SUBDIR.name()); - Path file = new Path(subdir, FILE.name()); - - FSDataInputStream fsIn = fs.open(file); - - final byte[] readBuf = new byte[blockSize * 2]; - - fsIn.readFully(0, readBuf, 0, readBuf.length); - - // Wait for a while for prefetch to finish (if any). - IgfsMetaManager meta = igfs.context().meta(); - - IgfsEntryInfo info = meta.info(meta.fileId(FILE)); - - IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2); - - IgniteCache<IgfsBlockKey, byte[]> dataCache = igfs.context().kernalContext().cache().jcache( - igfs.configuration().getDataCacheName()); - - for (int i = 0; i < 10; i++) { - if (dataCache.containsKey(key)) - break; - else - U.sleep(100); - } - - fsIn.close(); - - // Remove the file from the secondary file system. - igfsSecondary.delete(FILE, false); - - // Try reading the third block. Should fail. - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - IgfsInputStream in0 = igfs.open(FILE); - - in0.seek(blockSize * 2); - - try { - in0.read(readBuf); - } - finally { - U.closeQuiet(in0); - } - - return null; - } - }, IOException.class, - "Failed to read data due to secondary file system exception: /dir/subdir/file"); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java deleted file mode 100644 index 6c6e709..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAsyncSelfTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; - -/** - * Tests for DUAL_ASYNC mode. - */ -public class HadoopIgfsDualAsyncSelfTest extends HadoopIgfsDualAbstractSelfTest { - /** - * Constructor. - */ - public HadoopIgfsDualAsyncSelfTest() { - super(DUAL_ASYNC); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java deleted file mode 100644 index 96a63d5..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualSyncSelfTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; - -/** - * Tests for DUAL_SYNC mode. - */ -public class HadoopIgfsDualSyncSelfTest extends HadoopIgfsDualAbstractSelfTest { - /** - * Constructor. - */ - public HadoopIgfsDualSyncSelfTest() { - super(DUAL_SYNC); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java deleted file mode 100644 index f7af6f0..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemTestAdapter; -import org.apache.ignite.internal.util.typedef.T2; - -/** - * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance. - */ -public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFileSystemTestAdapter { - /** File system factory. */ - private final HadoopFileSystemFactory factory; - - /** - * Constructor. - * @param factory File system factory. - */ - public HadoopIgfsSecondaryFileSystemTestAdapter(HadoopFileSystemFactory factory) { - assert factory != null; - - this.factory = factory; - } - - /** {@inheritDoc} */ - @Override public String name() throws IOException { - return get().getUri().toString(); - } - - /** {@inheritDoc} */ - @Override public boolean exists(String path) throws IOException { - return get().exists(new Path(path)); - } - - /** {@inheritDoc} */ - @Override public boolean delete(String path, boolean recursive) throws IOException { - return get().delete(new Path(path), recursive); - } - - /** {@inheritDoc} */ - @Override public void mkdirs(String path) throws IOException { - boolean ok = get().mkdirs(new Path(path)); - if (!ok) - throw new IOException("Failed to mkdirs: " + path); - } - - /** {@inheritDoc} */ - @Override public void format() throws IOException { - HadoopIgfsUtils.clear(get()); - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties(String path) throws IOException { - Path p = new Path(path); - - FileStatus status = get().getFileStatus(p); - - Map<String,String> m = new HashMap<>(3); - - m.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); - m.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); - m.put(IgfsUtils.PROP_PERMISSION, permission(status)); - - return m; - } - - /** {@inheritDoc} */ - @Override public String permissions(String path) throws IOException { - return permission(get().getFileStatus(new Path(path))); - } - - /** - * Get permission for file status. - * - * @param status Status. - * @return Permission. - */ - private String permission(FileStatus status) { - FsPermission perm = status.getPermission(); - - return "0" + perm.getUserAction().ordinal() + perm.getGroupAction().ordinal() + perm.getOtherAction().ordinal(); - } - - /** {@inheritDoc} */ - @Override public InputStream openInputStream(String path) throws IOException { - return get().open(new Path(path)); - } - - /** {@inheritDoc} */ - @Override public OutputStream openOutputStream(String path, boolean append) throws IOException { - Path p = new Path(path); - - if (append) - return get().append(p); - else - return get().create(p, true/*overwrite*/); - } - - /** {@inheritDoc} */ - @Override public T2<Long, Long> times(String path) throws IOException { - FileStatus status = get().getFileStatus(new Path(path)); - - return new T2<>(status.getAccessTime(), status.getModificationTime()); - } - - /** {@inheritDoc} */ - @Override public IgfsEx igfs() { - return null; - } - - /** - * Create file system. - * - * @return File system. - * @throws IOException If failed. - */ - protected FileSystem get() throws IOException { - return factory.get(FileSystemConfiguration.DFLT_USER_NAME); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java deleted file mode 100644 index d9b5d66..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ /dev/null @@ -1,575 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; -import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.communication.CommunicationSpi; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.Callable; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; -import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; -import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; -import static org.apache.ignite.igfs.IgfsMode.PROXY; -import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT; - -/** - * Tests secondary file system configuration. - */ -public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest { - /** IGFS scheme */ - static final String IGFS_SCHEME = "igfs"; - - /** Primary file system authority. */ - private static final String PRIMARY_AUTHORITY = "igfs:grid0@"; - - /** Autogenerated secondary file system configuration path. */ - private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml"; - - /** Secondary file system authority. */ - private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500"; - - /** Autogenerated secondary file system configuration path. */ - static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; - - /** Secondary endpoint configuration. */ - protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; - - /** Group size. */ - public static final int GRP_SIZE = 128; - - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Primary file system URI. */ - protected URI primaryFsUri; - - /** Primary file system. */ - private FileSystem primaryFs; - - /** Full path of primary Fs configuration */ - private String primaryConfFullPath; - - /** Input primary Fs uri */ - private String primaryFsUriStr; - - /** Input URI scheme for configuration */ - private String primaryCfgScheme; - - /** Input URI authority for configuration */ - private String primaryCfgAuthority; - - /** if to pass configuration */ - private boolean passPrimaryConfiguration; - - /** Full path of s Fs configuration */ - private String secondaryConfFullPath; - - /** /Input URI scheme for configuration */ - private String secondaryFsUriStr; - - /** Input URI scheme for configuration */ - private String secondaryCfgScheme; - - /** Input URI authority for configuration */ - private String secondaryCfgAuthority; - - /** if to pass configuration */ - private boolean passSecondaryConfiguration; - - /** Default IGFS mode. */ - protected final IgfsMode mode; - - /** Skip embedded mode flag. */ - private final boolean skipEmbed; - - /** Skip local shmem flag. */ - private final boolean skipLocShmem; - - static { - SECONDARY_ENDPOINT_CFG = new IgfsIpcEndpointConfiguration(); - - SECONDARY_ENDPOINT_CFG.setType(IgfsIpcEndpointType.TCP); - SECONDARY_ENDPOINT_CFG.setPort(11500); - } - - /** - * Constructor. - * - * @param mode Default IGFS mode. - * @param skipEmbed Whether to skip embedded mode. - * @param skipLocShmem Whether to skip local shmem mode. - */ - protected HadoopSecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed, boolean skipLocShmem) { - this.mode = mode; - this.skipEmbed = skipEmbed; - this.skipLocShmem = skipLocShmem; - } - - /** - * Default constructor. - */ - public HadoopSecondaryFileSystemConfigurationTest() { - this(PROXY, true, false); - } - - /** - * Executes before each test. - * @throws Exception - */ - private void before() throws Exception { - initSecondary(); - - if (passPrimaryConfiguration) { - Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority, skipEmbed, skipLocShmem); - - primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH); - } - else - primaryConfFullPath = null; - - CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); - - fac.setConfigPaths(primaryConfFullPath); - fac.setUri(primaryFsUriStr); - - fac.start(); - - primaryFs = fac.get(null); //provider.createFileSystem(null); - - primaryFsUri = primaryFs.getUri(); - } - - /** - * Executes after each test. - * @throws Exception - */ - private void after() throws Exception { - if (primaryFs != null) { - try { - primaryFs.delete(new Path("/"), true); - } - catch (Exception ignore) { - // No-op. - } - - U.closeQuiet(primaryFs); - } - - G.stopAll(true); - - delete(primaryConfFullPath); - delete(secondaryConfFullPath); - } - - /** - * Utility method to delete file. - * - * @param file the file path to delete. - */ - @SuppressWarnings("ResultOfMethodCallIgnored") - private static void delete(String file) { - if (file != null) { - new File(file).delete(); - - assertFalse(new File(file).exists()); - } - } - - /** - * Initialize underlying secondary filesystem. - * - * @throws Exception - */ - private void initSecondary() throws Exception { - if (passSecondaryConfiguration) { - Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority, true, true); - - secondaryConf.setInt("fs.igfs.block.size", 1024); - - secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH); - } - else - secondaryConfFullPath = null; - - startNodes(); - } - - /** - * Starts the nodes for this test. - * - * @throws Exception If failed. - */ - private void startNodes() throws Exception { - if (mode != PRIMARY) - startSecondary(); - - startGrids(4); - } - - /** - * Starts secondary IGFS - */ - private void startSecondary() { - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("partitioned"); - igfsCfg.setMetaCacheName("replicated"); - igfsCfg.setName("igfs_secondary"); - igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG); - igfsCfg.setBlockSize(512 * 1024); - igfsCfg.setPrefetchBlocks(1); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName("partitioned"); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setNearConfiguration(null); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); - cacheCfg.setBackups(0); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("replicated"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName("grid_secondary"); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); - cfg.setFileSystemConfiguration(igfsCfg); - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - cfg.setCommunicationSpi(communicationSpi()); - - G.start(cfg); - } - - /** - * Get primary IPC endpoint configuration. - * - * @param gridName Grid name. - * @return IPC primary endpoint configuration. - */ - protected IgfsIpcEndpointConfiguration primaryIpcEndpointConfiguration(final String gridName) { - IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration(); - - cfg.setType(IgfsIpcEndpointType.TCP); - cfg.setPort(DFLT_IPC_PORT + getTestGridIndex(gridName)); - - return cfg; - } - - /** {@inheritDoc} */ - @Override public String getTestGridName() { - return "grid"; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(cacheConfiguration()); - cfg.setFileSystemConfiguration(fsConfiguration(gridName)); - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - cfg.setCommunicationSpi(communicationSpi()); - - return cfg; - } - - /** - * Gets cache configuration. - * - * @return Cache configuration. - */ - protected CacheConfiguration[] cacheConfiguration() { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName("partitioned"); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setNearConfiguration(null); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); - cacheCfg.setBackups(0); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("replicated"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - return new CacheConfiguration[] {metaCacheCfg, cacheCfg}; - } - - /** - * Gets IGFS configuration. - * - * @param gridName Grid name. - * @return IGFS configuration. - */ - protected FileSystemConfiguration fsConfiguration(String gridName) throws IgniteCheckedException { - FileSystemConfiguration cfg = new FileSystemConfiguration(); - - cfg.setDataCacheName("partitioned"); - cfg.setMetaCacheName("replicated"); - cfg.setName("igfs"); - cfg.setPrefetchBlocks(1); - cfg.setDefaultMode(mode); - - if (mode != PRIMARY) - cfg.setSecondaryFileSystem( - new IgniteHadoopIgfsSecondaryFileSystem(secondaryFsUriStr, secondaryConfFullPath)); - - cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); - - cfg.setManagementPort(-1); - cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. - - return cfg; - } - - /** @return Communication SPI. */ - private CommunicationSpi communicationSpi() { - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - return commSpi; - } - - /** - * Case #SecondaryFileSystemProvider(null, path) - * - * @throws Exception On failure. - */ - public void testFsConfigurationOnly() throws Exception { - primaryCfgScheme = IGFS_SCHEME; - primaryCfgAuthority = PRIMARY_AUTHORITY; - passPrimaryConfiguration = true; - primaryFsUriStr = null; - - // wrong secondary URI in the configuration: - secondaryCfgScheme = IGFS_SCHEME; - secondaryCfgAuthority = SECONDARY_AUTHORITY; - passSecondaryConfiguration = true; - secondaryFsUriStr = null; - - check(); - } - - /** - * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides - * the Fs uri set in the configuration. - * - * @throws Exception On failure. - */ - public void testFsUriOverridesUriInConfiguration() throws Exception { - // wrong primary URI in the configuration: - primaryCfgScheme = "foo"; - primaryCfgAuthority = "moo:zoo@bee"; - passPrimaryConfiguration = true; - primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY); - - // wrong secondary URI in the configuration: - secondaryCfgScheme = "foo"; - secondaryCfgAuthority = "moo:zoo@bee"; - passSecondaryConfiguration = true; - secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); - - check(); - } - - /** - * Perform actual check. - * - * @throws Exception If failed. - */ - @SuppressWarnings("deprecation") - private void check() throws Exception { - before(); - - try { - Path fsHome = new Path(primaryFsUri); - Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3"); - Path file = new Path(dir, "someFile"); - - assertPathDoesNotExist(primaryFs, file); - - FsPermission fsPerm = new FsPermission((short)644); - - FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L, null); - - // Try to write something in file. - os.write("abc".getBytes()); - - os.close(); - - // Check file status. - FileStatus fileStatus = primaryFs.getFileStatus(file); - - assertFalse(fileStatus.isDir()); - assertEquals(file, fileStatus.getPath()); - assertEquals(fsPerm, fileStatus.getPermission()); - } - finally { - after(); - } - } - - /** - * Create configuration for test. - * - * @param skipEmbed Whether to skip embedded mode. - * @param skipLocShmem Whether to skip local shmem mode. - * @return Configuration. - */ - static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) { - final Configuration cfg = new Configuration(); - - if (scheme != null && authority != null) - cfg.set("fs.defaultFS", scheme + "://" + authority + "/"); - - setImplClasses(cfg); - - if (authority != null) { - if (skipEmbed) - cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true); - - if (skipLocShmem) - cfg.setBoolean(String.format(HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true); - } - - return cfg; - } - - /** - * Sets Hadoop Fs implementation classes. - * - * @param cfg the configuration to set parameters into. - */ - static void setImplClasses(Configuration cfg) { - cfg.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); - - cfg.set("fs.AbstractFileSystem.igfs.impl", - org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem.class.getName()); - } - - /** - * Check path does not exist in a given FileSystem. - * - * @param fs FileSystem to check. - * @param path Path to check. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - private void assertPathDoesNotExist(final FileSystem fs, final Path path) { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - return fs.getFileStatus(path); - } - }, FileNotFoundException.class, null); - } - - /** - * Writes down the configuration to local disk and returns its path. - * - * @param cfg the configuration to write. - * @param pathFromIgniteHome path relatively to Ignite home. - * @return Full path of the written configuration. - */ - static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException { - if (!pathFromIgniteHome.startsWith("/")) - pathFromIgniteHome = "/" + pathFromIgniteHome; - - final String path = U.getIgniteHome() + pathFromIgniteHome; - - delete(path); - - File file = new File(path); - - try (FileOutputStream fos = new FileOutputStream(file)) { - cfg.writeXml(fos); - } - - assertTrue(file.exists()); - return path; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 3 * 60 * 1000; - } - - /** - * Makes URI. - * - * @param scheme the scheme - * @param authority the authority - * @return URI String - */ - static String mkUri(String scheme, String authority) { - return scheme + "://" + authority + "/"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java deleted file mode 100644 index a9d7bad..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import junit.framework.TestSuite; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint; -import org.apache.ignite.internal.util.typedef.G; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; -import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; - -/** - * Test suite for IGFS event tests. - */ -@SuppressWarnings("PublicInnerClass") -public class IgfsEventsTestSuite extends TestSuite { - /** - * @return Test suite. - * @throws Exception Thrown in case of the failure. - */ - public static TestSuite suite() throws Exception { - ClassLoader ldr = TestSuite.class.getClassLoader(); - - TestSuite suite = new TestSuite("Ignite FS Events Test Suite"); - - suite.addTest(new TestSuite(ldr.loadClass(ShmemPrimary.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName()))); - - suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName()))); - - return suite; - } - - /** - * @return Test suite with only tests that are supported on all platforms. - * @throws Exception Thrown in case of the failure. - */ - public static TestSuite suiteNoarchOnly() throws Exception { - ClassLoader ldr = TestSuite.class.getClassLoader(); - - TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only"); - - suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName()))); - - return suite; - } - - /** - * Shared memory IPC in PRIVATE mode. - */ - public static class ShmemPrimary extends IgfsEventsAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(IgfsMode.PRIMARY); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.SHMEM); - endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - return igfsCfg; - } - } - - /** - * Loopback socket IPS in PRIVATE mode. - */ - public static class LoopbackPrimary extends IgfsEventsAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(IgfsMode.PRIMARY); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.TCP); - endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - return igfsCfg; - } - } - - /** - * Base class for all IGFS tests with primary and secondary file system. - */ - public abstract static class PrimarySecondaryTest extends IgfsEventsAbstractSelfTest { - /** Secondary file system. */ - private static IgniteFileSystem igfsSec; - - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( - "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/", - "modules/core/src/test/config/hadoop/core-site-secondary.xml")); - - return igfsCfg; - } - - /** - * @return IGFS configuration for secondary file system. - */ - protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setName("igfs-secondary"); - igfsCfg.setDefaultMode(PRIMARY); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.TCP); - endpointCfg.setPort(11500); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - return igfsCfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - igfsSec = startSecondary(); - - super.beforeTestsStarted(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - G.stopAll(true); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - // Clean up secondary file system. - igfsSec.format(); - } - - /** - * Start a grid with the secondary file system. - * - * @return Secondary file system handle. - * @throws Exception If failed. - */ - @Nullable private IgniteFileSystem startSecondary() throws Exception { - IgniteConfiguration cfg = getConfiguration("grid-secondary", getSecondaryIgfsConfiguration()); - - cfg.setLocalHost("127.0.0.1"); - cfg.setPeerClassLoadingEnabled(false); - - Ignite secG = G.start(cfg); - - return secG.fileSystem("igfs-secondary"); - } - } - - /** - * Shared memory IPC in DUAL_SYNC mode. - */ - public static class ShmemDualSync extends PrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(DUAL_SYNC); - - return igfsCfg; - } - } - - /** - * Shared memory IPC in DUAL_SYNC mode. - */ - public static class ShmemDualAsync extends PrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(DUAL_ASYNC); - - return igfsCfg; - } - } - - /** - * Loopback socket IPC with secondary file system. - */ - public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(IgfsMode.PRIMARY); - - igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( - "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/", - "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); - - return igfsCfg; - } - - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getSecondaryIgfsConfiguration(); - - igfsCfg.setName("igfs-secondary"); - igfsCfg.setDefaultMode(PRIMARY); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.TCP); - endpointCfg.setPort(11500); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - return igfsCfg; - } - } - - /** - * Loopback IPC in DUAL_SYNC mode. - */ - public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(DUAL_SYNC); - - return igfsCfg; - } - } - - /** - * Loopback socket IPC in DUAL_ASYNC mode. - */ - public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { - FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); - - igfsCfg.setDefaultMode(DUAL_ASYNC); - - return igfsCfg; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java deleted file mode 100644 index 8e79356..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collection; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; -import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; -import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED; - -/** - * Test hadoop file system implementation. - */ -public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { - /** Path to the default hadoop configuration. */ - public static final String HADOOP_FS_CFG = "examples/config/filesystem/core-site.xml"; - - /** Group size. */ - public static final int GRP_SIZE = 128; - - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Node count. */ - private int cnt; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrids(nodeCount()); - - grid(0).createNearCache("data", new NearCacheConfiguration()); - - grid(0).createNearCache("meta", new NearCacheConfiguration()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - G.stopAll(true); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true)); - - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("data"); - igfsCfg.setMetaCacheName("meta"); - igfsCfg.setName("igfs"); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.SHMEM); - endpointCfg.setPort(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. - - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setCacheConfiguration(cacheConfiguration(gridName, "data"), cacheConfiguration(gridName, "meta")); - - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - if (cnt == 0) - cfg.setClientMode(true); - - cnt++; - - return cfg; - } - - /** @return Node count for test. */ - protected int nodeCount() { - return 4; - } - - /** - * Gets cache configuration. - * - * @param gridName Grid name. - * @return Cache configuration. - */ - protected CacheConfiguration cacheConfiguration(String gridName, String cacheName) { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName(cacheName); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); - cacheCfg.setBackups(0); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - return cacheCfg; - } - - /** - * Gets config of concrete File System. - * - * @return Config of concrete File System. - */ - protected Configuration getFileSystemConfig() { - Configuration cfg = new Configuration(); - - cfg.addResource(U.resolveIgniteUrl(HADOOP_FS_CFG)); - - return cfg; - } - - /** - * Gets File System name. - * - * @param grid Grid index. - * @return File System name. - */ - protected URI getFileSystemURI(int grid) { - try { - return new URI("igfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid)); - } - catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - /** @throws Exception If failed. */ - public void testContentsConsistency() throws Exception { - try (FileSystem fs = FileSystem.get(getFileSystemURI(0), getFileSystemConfig())) { - Collection<IgniteBiTuple<String, Long>> files = F.asList( - F.t("/dir1/dir2/file1", 1024L), - F.t("/dir1/dir2/file2", 8 * 1024L), - F.t("/dir1/file1", 1024 * 1024L), - F.t("/dir1/file2", 5 * 1024 * 1024L), - F.t("/file1", 64 * 1024L + 13), - F.t("/file2", 13L), - F.t("/file3", 123764L) - ); - - for (IgniteBiTuple<String, Long> file : files) { - - info("Writing file: " + file.get1()); - - try (OutputStream os = fs.create(new Path(file.get1()), (short)3)) { - byte[] data = new byte[file.get2().intValue()]; - - data[0] = 25; - data[data.length - 1] = 26; - - os.write(data); - } - - info("Finished writing file: " + file.get1()); - } - - for (int i = 1; i < nodeCount(); i++) { - - try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), getFileSystemConfig())) { - for (IgniteBiTuple<String, Long> file : files) { - Path path = new Path(file.get1()); - - FileStatus fileStatus = fs.getFileStatus(path); - - assertEquals(file.get2(), (Long)fileStatus.getLen()); - - byte[] read = new byte[file.get2().intValue()]; - - info("Reading file: " + path); - - try (FSDataInputStream in = fs.open(path)) { - in.readFully(read); - - assert read[0] == 25; - assert read[read.length - 1] == 26; - } - - info("Finished reading file: " + path); - } - } - } - } - } -} \ No newline at end of file