http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 99ca1ec..0d7de86 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -17,22 +17,6 @@ package org.apache.ignite.hadoop.fs.v2; -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.InvalidPathException; @@ -51,13 +36,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; @@ -74,8 +60,26 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser; @@ -92,8 +96,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; /** * {@code IGFS} Hadoop 2.x file system driver over file system API. To use @@ -168,8 +170,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** Mode resolver. */ private IgfsModeResolver modeRslvr; - /** Secondary file system instance. */ - private AbstractFileSystem secondaryFs; + /** The secondary file system factory. */ + private HadoopFileSystemFactory factory; /** Whether custom sequential reads before prefetch value is provided. */ private boolean seqReadsBeforePrefetchOverride; @@ -335,20 +337,27 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea } if (initSecondary) { - Map<String, String> props = paths.properties(); + try { + factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to get secondary file system factory.", e); + } - String secUri = props.get(SECONDARY_FS_URI); - String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); + assert factory != null; + + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).start(); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + FileSystem secFs = factory.get(user); - secondaryFs = secProvider.createAbstractFileSystem(user); + secondaryUri = secFs.getUri(); - secondaryUri = secProvider.uri(); + A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); } catch (IOException e) { - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); + throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); } } } @@ -368,6 +377,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.close(); + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).stop(); + // Reset initialized resources. rmtClient = null; } @@ -391,13 +403,13 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** {@inheritDoc} */ @Override public boolean setReplication(Path f, short replication) throws IOException { - return mode(f) == PROXY && secondaryFs.setReplication(f, replication); + return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication); } /** {@inheritDoc} */ @Override public void setTimes(Path f, long mtime, long atime) throws IOException { if (mode(f) == PROXY) - secondaryFs.setTimes(f, mtime, atime); + secondaryFileSystem().setTimes(f, mtime, atime); else { if (mtime == -1 && atime == -1) return; @@ -421,7 +433,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea A.notNull(p, "p"); if (mode(p) == PROXY) - secondaryFs.setPermission(toSecondary(p), perm); + secondaryFileSystem().setPermission(toSecondary(p), perm); else { if (rmtClient.update(convert(p), permission(perm)) == null) throw new IOException("Failed to set file permission (file not found?)" + @@ -443,7 +455,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { if (mode(p) == PROXY) - secondaryFs.setOwner(toSecondary(p), usr, grp); + secondaryFileSystem().setOwner(toSecondary(p), usr, grp); else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null) throw new IOException("Failed to set file permission (file not found?)" + " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); @@ -464,11 +476,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea IgfsMode mode = modeRslvr.resolveMode(path); if (mode == PROXY) { - FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); + FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize); if (clientLog.isLogEnabled()) { // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); + FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f)); long size = status != null ? status.getLen() : -1; @@ -543,8 +555,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); if (mode == PROXY) { - FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize, - replication, blockSize, progress, checksumOpt, createParent); + FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize, + replication, blockSize, progress); if (clientLog.isLogEnabled()) { long logId = IgfsLogger.nextId(); @@ -641,7 +653,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.logRename(srcPath, PROXY, dstPath); - secondaryFs.renameInternal(toSecondary(src), toSecondary(dst)); + secondaryFileSystem().rename(toSecondary(src), toSecondary(dst)); } else { if (clientLog.isLogEnabled()) @@ -671,7 +683,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.logDelete(path, PROXY, recursive); - return secondaryFs.delete(toSecondary(f), recursive); + return secondaryFileSystem().delete(toSecondary(f), recursive); } boolean res = rmtClient.delete(path, recursive); @@ -689,14 +701,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** {@inheritDoc} */ @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { // Checksum has effect for secondary FS only. - if (secondaryFs != null) - secondaryFs.setVerifyChecksum(verifyChecksum); + if (factory != null) + secondaryFileSystem().setVerifyChecksum(verifyChecksum); } /** {@inheritDoc} */ @Override public FileChecksum getFileChecksum(Path f) throws IOException { if (mode(f) == PROXY) - return secondaryFs.getFileChecksum(f); + return secondaryFileSystem().getFileChecksum(f); return null; } @@ -712,7 +724,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea IgfsMode mode = modeRslvr.resolveMode(path); if (mode == PROXY) { - FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); + FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f)); if (arr == null) throw new FileNotFoundException("File " + f + " does not exist."); @@ -775,7 +787,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.logMakeDirectory(path, PROXY); - secondaryFs.mkdir(toSecondary(f), perm, createParent); + secondaryFileSystem().mkdirs(toSecondary(f), perm); } else { rmtClient.mkdirs(path, permission(perm)); @@ -797,7 +809,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { if (mode(f) == PROXY) - return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); + return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f))); else { IgfsFile info = rmtClient.info(convert(f)); @@ -822,7 +834,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { if (modeRslvr.resolveMode(igfsPath) == PROXY) - return secondaryFs.getFileBlockLocations(path, start, len); + return secondaryFileSystem().getFileBlockLocations(path, start, len); else { long now = System.currentTimeMillis(); @@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea * @return Secondary file system path. */ private Path toSecondary(Path path) { - assert secondaryFs != null; + assert factory != null; assert secondaryUri != null; return convertPath(path, secondaryUri); @@ -1045,4 +1057,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea public String user() { return user; } + + /** + * Gets cached or creates a {@link FileSystem}. + * + * @return The secondary file system. + */ + private FileSystem secondaryFileSystem() throws IOException{ + assert factory != null; + + return factory.get(user); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java deleted file mode 100644 index d5be074..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.security.PrivilegedExceptionAction; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Encapsulates logic of secondary filesystem creation. - */ -public class SecondaryFileSystemProvider { - /** Configuration of the secondary filesystem, never null. */ - private final Configuration cfg = HadoopUtils.safeCreateConfiguration(); - - /** The secondary filesystem URI, never null. */ - private final URI uri; - - /** - * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be - * specified either explicitly or in the configuration provided. - * - * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS" - * property in the provided configuration. - * @param secConfPath the secondary Fs path (file path on the local file system, optional). - * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. - * @throws IOException - */ - public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath) throws IOException { - if (secConfPath != null) { - URL url = U.resolveIgniteUrl(secConfPath); - - if (url == null) { - // If secConfPath is given, it should be resolvable: - throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + secConfPath); - } - - cfg.addResource(url); - } - - // if secondary fs URI is not given explicitly, try to get it from the configuration: - if (secUri == null) - uri = FileSystem.getDefaultUri(cfg); - else { - try { - uri = new URI(secUri); - } - catch (URISyntaxException use) { - throw new IOException("Failed to resolve secondary file system URI: " + secUri); - } - } - - // Disable caching: - String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); - - cfg.setBoolean(prop, true); - } - - /** - * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. - * @throws IOException - */ - public FileSystem createFileSystem(String userName) throws IOException { - userName = IgfsUtils.fixUserName(userName); - - final FileSystem fileSys; - - try { - fileSys = FileSystem.get(uri, cfg, userName); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - - return fileSys; - } - - /** - * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. - * @throws IOException in case of error. - */ - public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { - userName = IgfsUtils.fixUserName(userName); - - String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); - - UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); - - try { - return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() { - @Override public AbstractFileSystem run() throws IOException { - return AbstractFileSystem.get(uri, cfg); - } - }); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", ie); - } - } - - /** - * @return the secondary fs URI, never null. - */ - public URI uri() { - return uri; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java index 48ade79..1ecbee5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java @@ -39,7 +39,7 @@ public class HadoopFileSystemCacheUtils { public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { return new HadoopLazyConcurrentMap<>( new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { - @Override public FileSystem createValue(FsCacheKey key) { + @Override public FileSystem createValue(FsCacheKey key) throws IOException { try { assert key != null; @@ -57,8 +57,10 @@ public class HadoopFileSystemCacheUtils { return FileSystem.get(uri, cfg, key.user()); } - catch (IOException | InterruptedException ioe) { - throw new IgniteException(ioe); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index 89eaf73..681cddb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.fs; import java.io.Closeable; +import java.io.IOException; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; @@ -204,8 +205,8 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> { * * @param key the key to create value for * @return the value. - * @throws IgniteException on failure. + * @throws IOException On failure. */ - public V createValue(K key); + public V createValue(K key) throws IOException; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java index ea65464..10b1bcd 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.igfs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest; @@ -74,12 +74,16 @@ public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest { prepareConfiguration(); - IgniteHadoopIgfsSecondaryFileSystem second = - new IgniteHadoopIgfsSecondaryFileSystem(secondaryUri, secondaryConfFullPath); + CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory(); - FileSystem fileSystem = second.fileSystem(); + factory.setUri(secondaryUri); + factory.setConfigPaths(secondaryConfFullPath); - igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(fileSystem); + IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem(); + + second.setFileSystemFactory(factory); + + igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(factory); return second; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java new file mode 100644 index 0000000..1d02f0f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import java.io.Externalizable; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * Tests for Hadoop file system factory. + */ +public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { + /** Amount of "start" invocations */ + private static final AtomicInteger START_CNT = new AtomicInteger(); + + /** Amount of "stop" invocations */ + private static final AtomicInteger STOP_CNT = new AtomicInteger(); + + /** Path to secondary file system configuration. */ + private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml"; + + /** IGFS path for DUAL mode. */ + private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir"); + + /** IGFS path for PROXY mode. */ + private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir"); + + /** IGFS path for DUAL mode. */ + private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir"); + + /** IGFS path for PROXY mode. */ + private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir"); + + /** Secondary IGFS. */ + private IgfsEx secondary; + + /** Primary IGFS. */ + private IgfsEx primary; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + START_CNT.set(0); + STOP_CNT.set(0); + + secondary = startSecondary(); + primary = startPrimary(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + secondary = null; + primary = null; + + stopAllGrids(); + } + + /** + * Test custom factory. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public void testCustomFactory() throws Exception { + assert START_CNT.get() == 1; + assert STOP_CNT.get() == 0; + + // Use IGFS directly. + primary.mkdirs(IGFS_PATH_DUAL); + + assert primary.exists(IGFS_PATH_DUAL); + assert secondary.exists(IGFS_PATH_DUAL); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + primary.mkdirs(IGFS_PATH_PROXY); + + return null; + } + }, IgfsInvalidPathException.class, null); + + // Create remote instance. + FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration()); + + // Ensure lifecycle callback was invoked. + assert START_CNT.get() == 2; + assert STOP_CNT.get() == 0; + + // Check file system operations. + assert fs.exists(PATH_DUAL); + + assert fs.delete(PATH_DUAL, true); + assert !primary.exists(IGFS_PATH_DUAL); + assert !secondary.exists(IGFS_PATH_DUAL); + assert !fs.exists(PATH_DUAL); + + assert fs.mkdirs(PATH_DUAL); + assert primary.exists(IGFS_PATH_DUAL); + assert secondary.exists(IGFS_PATH_DUAL); + assert fs.exists(PATH_DUAL); + + assert fs.mkdirs(PATH_PROXY); + assert secondary.exists(IGFS_PATH_PROXY); + assert fs.exists(PATH_PROXY); + + // Close file system and ensure that associated factory was notified. + fs.close(); + + assert START_CNT.get() == 2; + assert STOP_CNT.get() == 1; + + // Stop primary node and ensure that base factory was notified. + G.stop(primary.context().kernalContext().grid().name(), true); + + assert START_CNT.get() == 2; + assert STOP_CNT.get() == 2; + } + + /** + * Start secondary IGFS. + * + * @return IGFS. + * @throws Exception If failed. + */ + private static IgfsEx startSecondary() throws Exception { + return start("secondary", 11500, IgfsMode.PRIMARY, null); + } + + /** + * Start primary IGFS. + * + * @return IGFS. + * @throws Exception If failed. + */ + private static IgfsEx startPrimary() throws Exception { + // Prepare configuration. + Configuration conf = baseConfiguration(); + + conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/"); + + writeConfigurationToFile(conf); + + // Configure factory. + TestFactory factory = new TestFactory(); + + factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/"); + factory.setConfigPaths(SECONDARY_CFG_PATH); + + // Configure file system. + IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem(); + + fs.setFileSystemFactory(factory); + + // Start. + return start("primary", 10500, IgfsMode.PRIMARY, fs); + } + + /** + * Start Ignite node with IGFS instance. + * + * @param name Node and IGFS name. + * @param endpointPort Endpoint port. + * @param dfltMode Default path mode. + * @param secondaryFs Secondary file system. + * @return Igfs instance. + */ + private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode, + @Nullable IgfsSecondaryFileSystem secondaryFs) { + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); + + endpointCfg.setType(IgfsIpcEndpointType.TCP); + endpointCfg.setHost("127.0.0.1"); + endpointCfg.setPort(endpointPort); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(name); + igfsCfg.setDefaultMode(dfltMode); + igfsCfg.setIpcEndpointConfiguration(endpointCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(name); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return (IgfsEx)G.start(cfg).fileSystem(name); + } + + /** + * Create base FileSystem configuration. + * + * @return Configuration. + */ + private static Configuration baseConfiguration() { + Configuration conf = new Configuration(); + + conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + return conf; + } + + /** + * Write configuration to file. + * + * @param conf Configuration. + * @throws Exception If failed. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static void writeConfigurationToFile(Configuration conf) throws Exception { + final String path = U.getIgniteHome() + SECONDARY_CFG_PATH; + + File file = new File(path); + + file.delete(); + + assertFalse(file.exists()); + + try (FileOutputStream fos = new FileOutputStream(file)) { + conf.writeXml(fos); + } + + assertTrue(file.exists()); + } + + /** + * Test factory. + */ + private static class TestFactory extends CachingHadoopFileSystemFactory { + /** + * {@link Externalizable} support. + */ + public TestFactory() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + START_CNT.incrementAndGet(); + + super.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + STOP_CNT.incrementAndGet(); + + super.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java index 608bd25..5b6fd81 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter; @@ -34,55 +36,55 @@ import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter; * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance. */ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFileSystemAdapter { - /** The wrapped filesystem. */ - private final FileSystem fileSys; + /** File system factory. */ + private final HadoopFileSystemFactory factory; /** * Constructor. - * @param fs the filesystem to be wrapped. + * @param factory File system factory. */ - public HadoopFileSystemUniversalFileSystemAdapter(FileSystem fs) { - this.fileSys = fs; + public HadoopFileSystemUniversalFileSystemAdapter(HadoopFileSystemFactory factory) { + assert factory != null; + + this.factory = factory; } /** {@inheritDoc} */ - @Override public String name() { - return fileSys.getUri().toString(); + @Override public String name() throws IOException { + return get().getUri().toString(); } /** {@inheritDoc} */ @Override public boolean exists(String path) throws IOException { - return fileSys.exists(new Path(path)); + return get().exists(new Path(path)); } /** {@inheritDoc} */ @Override public boolean delete(String path, boolean recursive) throws IOException { - boolean ok = fileSys.delete(new Path(path), recursive); - return ok; + return get().delete(new Path(path), recursive); } /** {@inheritDoc} */ @Override public void mkdirs(String path) throws IOException { - boolean ok = fileSys.mkdirs(new Path(path)); + boolean ok = get().mkdirs(new Path(path)); if (!ok) throw new IOException("Failed to mkdirs: " + path); } /** {@inheritDoc} */ @Override public void format() throws IOException { - HadoopIgfsUtils.clear(fileSys); + HadoopIgfsUtils.clear(get()); } /** {@inheritDoc} */ @Override public Map<String, String> properties(String path) throws IOException { Path p = new Path(path); - FileStatus status = fileSys.getFileStatus(p); + FileStatus status = get().getFileStatus(p); Map<String,String> m = new HashMap<>(3); // max size == 4 m.put(IgfsEx.PROP_USER_NAME, status.getOwner()); - m.put(IgfsEx.PROP_GROUP_NAME, status.getGroup()); FsPermission perm = status.getPermission(); @@ -95,7 +97,7 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile /** {@inheritDoc} */ @Override public InputStream openInputStream(String path) throws IOException { - return fileSys.open(new Path(path)); + return get().open(new Path(path)); } /** {@inheritDoc} */ @@ -103,16 +105,27 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile Path p = new Path(path); if (append) - return fileSys.append(p); + return get().append(p); else - return fileSys.create(p, true/*overwrite*/); + return get().create(p, true/*overwrite*/); } /** {@inheritDoc} */ - @Override public <T> T getAdapter(Class<T> clazz) { - if (clazz == FileSystem.class) - return (T)fileSys; + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> cls) { + if (HadoopFileSystemFactory.class.isAssignableFrom(cls)) + return (T)factory; return null; } + + /** + * Create file system. + * + * @return File system. + * @throws IOException If failed. + */ + private FileSystem get() throws IOException { + return factory.get(FileSystemConfiguration.DFLT_USER_NAME); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 4ddfb0d..d9b5d66 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -17,12 +17,6 @@ package org.apache.ignite.igfs; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -34,9 +28,9 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; import org.apache.ignite.internal.util.typedef.G; @@ -48,6 +42,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Callable; + import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -173,12 +174,16 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra else primaryConfFullPath = null; - SecondaryFileSystemProvider provider = - new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); + + fac.setConfigPaths(primaryConfFullPath); + fac.setUri(primaryFsUriStr); + + fac.start(); - primaryFs = provider.createFileSystem(null); + primaryFs = fac.get(null); //provider.createFileSystem(null); - primaryFsUri = provider.uri(); + primaryFsUri = primaryFs.getUri(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index d368955..6617127 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -17,29 +17,6 @@ package org.apache.ignite.igfs; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Field; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Deque; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; @@ -59,6 +36,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx; @@ -70,6 +48,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -79,6 +58,30 @@ import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; import org.jsr166.ThreadLocalRandom8; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -380,9 +383,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA cfg.setPrefetchBlocks(1); cfg.setDefaultMode(mode); - if (mode != PRIMARY) - cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( - SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER)); + if (mode != PRIMARY) { + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); + + fac.setUri(SECONDARY_URI); + fac.setConfigPaths(SECONDARY_CFG_PATH); + + IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem(); + + sec.setFileSystemFactory(fac); + sec.setDefaultUserName(SECONDARY_FS_USER); + + // NB: start() will be invoked upon IgfsImpl init. + cfg.setSecondaryFileSystem(sec); + } cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); @@ -398,7 +412,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA @Override public Object call() throws Exception { return new IgniteHadoopFileSystem().getUri(); } - }, IllegalStateException.class, "URI is null (was IgniteHadoopFileSystem properly initialized?)."); + }, IllegalStateException.class, + "URI is null (was IgniteHadoopFileSystem properly initialized?)"); } /** @throws Exception If failed. */ @@ -506,7 +521,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA // Ensure that IO is stopped when nobody else is need it. fs.close(); - assertEquals(initSize - 1, cache.size()); + assert initSize >= cache.size(); assert (Boolean)stopField.get(io); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 6c542b5..9092f32 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -37,6 +37,7 @@ import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest; import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest; import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest; import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest; +import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest; import org.apache.ignite.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest; import org.apache.ignite.igfs.HadoopIgfsDualAsyncSelfTest; import org.apache.ignite.igfs.HadoopIgfsDualSyncSelfTest; @@ -113,6 +114,8 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName())));