IGNITE-3918: Moved public classes from "ignite-hadoop-impl" to "ignite-hadoop" module.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41de3ab5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41de3ab5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41de3ab5 Branch: refs/heads/ignite-1.6.8-hadoop Commit: 41de3ab575a8bfbea07fa661beac1d6e7735ea3b Parents: cb304b1 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Sep 20 15:54:48 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Sep 20 15:54:48 2016 +0300 ---------------------------------------------------------------------- .../hadoop/fs/BasicHadoopFileSystemFactory.java | 275 --------- .../fs/CachingHadoopFileSystemFactory.java | 85 --- .../hadoop/fs/HadoopFileSystemFactory.java | 52 -- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 580 ------------------- .../fs/KerberosHadoopFileSystemFactory.java | 217 ------- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 21 +- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 21 +- .../internal/processors/hadoop/HadoopUtils.java | 4 - .../HadoopBasicFileSystemFactoryDelegate.java | 162 ++++++ .../HadoopCachingFileSystemFactoryDelegate.java | 75 +++ .../HadoopDefaultFileSystemFactoryDelegate.java | 61 ++ ...doopIgfsSecondaryFileSystemDelegateImpl.java | 471 +++++++++++++++ ...HadoopKerberosFileSystemFactoryDelegate.java | 112 ++++ ...KerberosHadoopFileSystemFactorySelfTest.java | 7 +- .../igfs/HadoopFIleSystemFactorySelfTest.java | 55 +- ...adoopIgfsSecondaryFileSystemTestAdapter.java | 10 +- ...oopSecondaryFileSystemConfigurationTest.java | 14 +- .../hadoop/fs/BasicHadoopFileSystemFactory.java | 164 ++++++ .../fs/CachingHadoopFileSystemFactory.java | 41 ++ .../hadoop/fs/HadoopFileSystemFactory.java | 45 ++ .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 257 ++++++++ .../fs/KerberosHadoopFileSystemFactory.java | 142 +++++ .../hadoop/delegate/HadoopDelegateUtils.java | 117 ++++ .../HadoopFileSystemFactoryDelegate.java | 36 ++ .../HadoopIgfsSecondaryFileSystemDelegate.java | 28 + 25 files changed, 1796 insertions(+), 1256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java deleted file mode 100644 index a01bfaf..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.hadoop.util.KerberosUserNameMapper; -import org.apache.ignite.hadoop.util.UserNameMapper; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Arrays; - -/** - * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call. - * <p> - * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop. - */ -public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware { - /** */ - private static final long serialVersionUID = 0L; - - /** File system URI. */ - private String uri; - - /** File system config paths. */ - private String[] cfgPaths; - - /** User name mapper. */ - private UserNameMapper usrNameMapper; - - /** Configuration of the secondary filesystem, never null. */ - protected transient Configuration cfg; - - /** Resulting URI. */ - protected transient URI fullUri; - - /** - * Constructor. - */ - public BasicHadoopFileSystemFactory() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public final FileSystem get(String name) throws IOException { - String name0 = IgfsUtils.fixUserName(name); - - if (usrNameMapper != null) - name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0)); - - return getWithMappedName(name0); - } - - /** - * Internal file system create routine. - * - * @param usrName User name. - * @return File system. - * @throws IOException If failed. - */ - protected FileSystem getWithMappedName(String usrName) throws IOException { - assert cfg != null; - - try { - // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. - // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context - // classloader to classloader of current class to avoid strange class-cast-exceptions. - ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); - - try { - return create(usrName); - } - finally { - HadoopUtils.restoreContextClassLoader(oldLdr); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - } - - /** - * Internal file system creation routine, invoked in correct class loader context. - * - * @param usrName User name. - * @return File system. - * @throws IOException If failed. - * @throws InterruptedException if the current thread is interrupted. - */ - protected FileSystem create(String usrName) throws IOException, InterruptedException { - return FileSystem.get(fullUri, cfg, usrName); - } - - /** - * Gets file system URI. - * <p> - * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}. - * <p> - * If not set, default URI will be picked from file system configuration using - * {@link FileSystem#getDefaultUri(Configuration)} method. - * - * @return File system URI. - */ - @Nullable public String getUri() { - return uri; - } - - /** - * Sets file system URI. See {@link #getUri()} for more information. - * - * @param uri File system URI. - */ - public void setUri(@Nullable String uri) { - this.uri = uri; - } - - /** - * Gets paths to additional file system configuration files (e.g. core-site.xml). - * <p> - * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable. - * <p> - * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means - * that path order might be important in some cases. - * <p> - * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of - * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well. - * - * @return Paths to file system configuration files. - */ - @Nullable public String[] getConfigPaths() { - return cfgPaths; - } - - /** - * Set paths to additional file system configuration files (e.g. core-site.xml). See {@link #getConfigPaths()} for - * more information. - * - * @param cfgPaths Paths to file system configuration files. - */ - public void setConfigPaths(@Nullable String... cfgPaths) { - this.cfgPaths = cfgPaths; - } - - /** - * Get optional user name mapper. - * <p> - * When IGFS is invoked from Hadoop, user name is passed along the way to ensure that request will be performed - * with proper user context. User name is passed in a simple form and doesn't contain any extended information, - * such as host, domain or Kerberos realm. You may use name mapper to translate plain user name to full user - * name required by security engine of the underlying file system. - * <p> - * For example you may want to use {@link KerberosUserNameMapper} to user name from {@code "johndoe"} to - * {@code "john...@your.realm.com"}. - * - * @return User name mapper. - */ - @Nullable public UserNameMapper getUserNameMapper() { - return usrNameMapper; - } - - /** - * Set optional user name mapper. See {@link #getUserNameMapper()} for more information. - * - * @param usrNameMapper User name mapper. - */ - public void setUserNameMapper(@Nullable UserNameMapper usrNameMapper) { - this.usrNameMapper = usrNameMapper; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - cfg = HadoopUtils.safeCreateConfiguration(); - - if (cfgPaths != null) { - for (String cfgPath : cfgPaths) { - if (cfgPath == null) - throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths)); - else { - URL url = U.resolveIgniteUrl(cfgPath); - - if (url == null) { - // If secConfPath is given, it should be resolvable: - throw new IgniteException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + cfgPath); - } - - cfg.addResource(url); - } - } - } - - // If secondary fs URI is not given explicitly, try to get it from the configuration: - if (uri == null) - fullUri = FileSystem.getDefaultUri(cfg); - else { - try { - fullUri = new URI(uri); - } - catch (URISyntaxException use) { - throw new IgniteException("Failed to resolve secondary file system URI: " + uri); - } - } - - if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) - ((LifecycleAware)usrNameMapper).start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) - ((LifecycleAware)usrNameMapper).stop(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, uri); - - if (cfgPaths != null) { - out.writeInt(cfgPaths.length); - - for (String cfgPath : cfgPaths) - U.writeString(out, cfgPath); - } - else - out.writeInt(-1); - - out.writeObject(usrNameMapper); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - uri = U.readString(in); - - int cfgPathsCnt = in.readInt(); - - if (cfgPathsCnt != -1) { - cfgPaths = new String[cfgPathsCnt]; - - for (int i = 0; i < cfgPathsCnt; i++) - cfgPaths[i] = U.readString(in); - } - - usrNameMapper = (UserNameMapper)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java deleted file mode 100644 index bcbb082..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; - -import java.io.IOException; -import java.net.URI; - -/** - * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on - * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each - * user instead. - * <p> - * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user - * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to - * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation - * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either - * use {@link BasicHadoopFileSystemFactory} or implement your own factory. - */ -public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { - /** */ - private static final long serialVersionUID = 0L; - - /** Per-user file system cache. */ - private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { - @Override public FileSystem createValue(String key) throws IOException { - return CachingHadoopFileSystemFactory.super.getWithMappedName(key); - } - } - ); - - /** - * Public non-arg constructor. - */ - public CachingHadoopFileSystemFactory() { - // noop - } - - /** {@inheritDoc} */ - @Override public FileSystem getWithMappedName(String name) throws IOException { - return cache.getOrCreate(name); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - super.start(); - - // Disable caching. - cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - super.stop(); - - try { - cache.close(); - } - catch (IgniteCheckedException ice) { - throw new IgniteException(ice); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java deleted file mode 100644 index 5ad08ab..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.lifecycle.LifecycleAware; - -import java.io.IOException; -import java.io.Serializable; - -/** - * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}. - * <p> - * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required. - * <p> - * It is implementation dependent whether to rely on built-in Hadoop file system cache, implement own caching facility - * or doesn't cache file systems at all. - * <p> - * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be - * performed by Ignite. You may want to implement some initialization or cleanup there. - * <p> - * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the - * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file - * system paths. - */ -public interface HadoopFileSystemFactory extends Serializable { - /** - * Gets file system for the given user name. - * - * @param usrName User name - * @return File system. - * @throws IOException In case of error. - */ - public FileSystem get(String usrName) throws IOException; -} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java deleted file mode 100644 index 6b5c776..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; -import org.apache.ignite.igfs.IgfsException; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsParentNotDirectoryException; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; -import org.apache.ignite.igfs.IgfsPathNotFoundException; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; -import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; -import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; - -/** - * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}. - * <p> - * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. - */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware, - HadoopPayloadAware { - /** The default user name. It is used if no user context is set. */ - private String dfltUsrName; - - /** Factory. */ - private HadoopFileSystemFactory fsFactory; - - /** - * Default constructor for Spring. - */ - public IgniteHadoopIgfsSecondaryFileSystem() { - // No-op. - } - - /** - * Simple constructor that is to be used by default. - * - * @param uri URI of file system. - * @throws IgniteCheckedException In case of error. - * @deprecated Use {@link #getFileSystemFactory()} instead. - */ - @Deprecated - public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { - this(uri, null, null); - } - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @throws IgniteCheckedException In case of error. - * @deprecated Use {@link #getFileSystemFactory()} instead. - */ - @Deprecated - public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) - throws IgniteCheckedException { - this(uri, cfgPath, null); - } - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @param userName User name. - * @throws IgniteCheckedException In case of error. - * @deprecated Use {@link #getFileSystemFactory()} instead. - */ - @Deprecated - public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, - @Nullable String userName) throws IgniteCheckedException { - setDefaultUserName(userName); - - CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); - - fac.setUri(uri); - - if (cfgPath != null) - fac.setConfigPaths(cfgPath); - - setFileSystemFactory(fac); - } - - /** - * Gets default user name. - * <p> - * Defines user name which will be used during file system invocation in case no user name is defined explicitly - * through {@link FileSystem#get(URI, Configuration, String)}. - * <p> - * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name - * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or - * {@link IgfsUserContext#doAs(String, Callable)} methods. - * <p> - * If not set value of system property {@code "user.name"} will be used. If this property is not set either, - * {@code "anonymous"} will be used. - * - * @return Default user name. - */ - @Nullable public String getDefaultUserName() { - return dfltUsrName; - } - - /** - * Sets default user name. See {@link #getDefaultUserName()} for details. - * - * @param dfltUsrName Default user name. - */ - public void setDefaultUserName(@Nullable String dfltUsrName) { - this.dfltUsrName = dfltUsrName; - } - - /** - * Gets secondary file system factory. - * <p> - * This factory will be used whenever a call to a target {@link FileSystem} is required. - * <p> - * If not set, {@link CachingHadoopFileSystemFactory} will be used. - * - * @return Secondary file system factory. - */ - public HadoopFileSystemFactory getFileSystemFactory() { - return fsFactory; - } - - /** - * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details. - * - * @param factory Secondary file system factory. - */ - public void setFileSystemFactory(HadoopFileSystemFactory factory) { - this.fsFactory = factory; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - URI uri = fileSystemForUser().getUri(); - - return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); - } - - /** - * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. - * - * @param e Exception to check. - * @param detailMsg Detailed error message. - * @return Appropriate exception. - */ - private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - return cast(detailMsg, e); - } - - /** - * Cast IO exception to IGFS exception. - * - * @param e IO exception. - * @return IGFS exception. - */ - public static IgfsException cast(String msg, IOException e) { - if (e instanceof FileNotFoundException) - return new IgfsPathNotFoundException(e); - else if (e instanceof ParentNotDirectoryException) - return new IgfsParentNotDirectoryException(msg, e); - else if (e instanceof PathIsNotEmptyDirectoryException) - return new IgfsDirectoryNotEmptyException(e); - else if (e instanceof PathExistsException) - return new IgfsPathAlreadyExistsException(msg, e); - else - return new IgfsException(msg, e); - } - - /** - * Convert Hadoop FileStatus properties to map. - * - * @param status File status. - * @return IGFS attributes. - */ - private static Map<String, String> properties(FileStatus status) { - FsPermission perm = status.getPermission(); - - if (perm == null) - perm = FsPermission.getDefault(); - - HashMap<String, String> res = new HashMap<>(3); - - res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); - res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); - res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - try { - return fileSystemForUser().exists(convert(path)); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); - - final FileSystem fileSys = fileSystemForUser(); - - try { - if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); - - if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); - } - - //Result is not used in case of secondary FS. - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // Delegate to the secondary file system. - try { - if (!fileSystemForUser().rename(convert(src), convert(dest))) - throw new IgfsException("Failed to rename (secondary file system returned false) " + - "[src=" + src + ", dest=" + dest + ']'); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - try { - return fileSystemForUser().delete(convert(path), recursive); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - try { - if (!fileSystemForUser().mkdirs(convert(path))) - throw new IgniteException("Failed to make directories [path=" + path + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { - try { - if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) - throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) { - try { - FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); - - if (statuses == null) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsPath> res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) - res.add(new IgfsPath(path, status.getPath().getName())); - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) { - try { - FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); - - if (statuses == null) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsFile> res = new ArrayList<>(statuses.length); - - for (FileStatus s : statuses) { - IgfsEntryInfo fsInfo = s.isDirectory() ? - IgfsUtils.createDirectory( - IgniteUuid.randomUuid(), - null, - properties(s), - s.getAccessTime(), - s.getModificationTime() - ) : - IgfsUtils.createFile( - IgniteUuid.randomUuid(), - (int)s.getBlockSize(), - s.getLen(), - null, - null, - false, - properties(s), - s.getAccessTime(), - s.getModificationTime() - ); - - res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); - } - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, boolean overwrite) { - try { - return fileSystemForUser().create(convert(path), overwrite); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) { - HadoopIgfsProperties props0 = - new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); - - try { - return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, - (short) replication, blockSize, null); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + - ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + - ", blockSize=" + blockSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) { - try { - return fileSystemForUser().append(convert(path), bufSize); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) { - try { - final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); - - if (status == null) - return null; - - final Map<String, String> props = properties(status); - - return new IgfsFile() { - @Override public IgfsPath path() { - return path; - } - - @Override public boolean isFile() { - return status.isFile(); - } - - @Override public boolean isDirectory() { - return status.isDirectory(); - } - - @Override public int blockSize() { - // By convention directory has blockSize == 0, while file has blockSize > 0: - return isDirectory() ? 0 : (int)status.getBlockSize(); - } - - @Override public long groupBlockSize() { - return status.getBlockSize(); - } - - @Override public long accessTime() { - return status.getAccessTime(); - } - - @Override public long modificationTime() { - return status.getModificationTime(); - } - - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - @Nullable @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - @Override public long length() { - return status.getLen(); - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return props; - } - }; - } - catch (FileNotFoundException ignore) { - return null; - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - try { - // We don't use FileSystem#getUsed() since it counts only the files - // in the filesystem root, not all the files recursively. - return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get used space size of file system."); - } - } - - /** {@inheritDoc} */ - @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { - try { - // We don't use FileSystem#getUsed() since it counts only the files - // in the filesystem root, not all the files recursively. - fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed set times for path: " + path); - } - } - - /** - * Gets the underlying {@link FileSystem}. - * This method is used solely for testing. - * @return the underlying Hadoop {@link FileSystem}. - */ - public FileSystem fileSystem() { - return fileSystemForUser(); - } - - /** - * Gets the FileSystem for the current context user. - * @return the FileSystem instance, never null. - */ - private FileSystem fileSystemForUser() { - String user = IgfsUserContext.currentUser(); - - if (F.isEmpty(user)) - user = IgfsUtils.fixUserName(dfltUsrName); - - assert !F.isEmpty(user); - - try { - return fsFactory.get(user); - } - catch (IOException ioe) { - throw new IgniteException(ioe); - } - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - dfltUsrName = IgfsUtils.fixUserName(dfltUsrName); - - if (fsFactory == null) - fsFactory = new CachingHadoopFileSystemFactory(); - - if (fsFactory instanceof LifecycleAware) - ((LifecycleAware) fsFactory).start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - if (fsFactory instanceof LifecycleAware) - ((LifecycleAware)fsFactory).stop(); - } - - /** {@inheritDoc} */ - @Override public HadoopFileSystemFactory getPayload() { - return fsFactory; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java deleted file mode 100644 index bbfbc59..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.security.PrivilegedExceptionAction; - -/** - * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos. - * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user. - * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details. - * The principal and the key tab name to be used for Kerberos authentication are set explicitly - * in the factory configuration. - * - * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set - * to {@code true}, file system instances will be cached by Hadoop. - */ -public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { - /** */ - private static final long serialVersionUID = 0L; - - /** The default interval used to re-login from the key tab, in milliseconds. */ - public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L; - - /** Keytab full file name. */ - private String keyTab; - - /** Keytab principal. */ - private String keyTabPrincipal; - - /** The re-login interval. See {@link #getReloginInterval()} for more information. */ - private long reloginInterval = DFLT_RELOGIN_INTERVAL; - - /** Time of last re-login attempt, in system milliseconds. */ - private transient volatile long lastReloginTime; - - /** - * Constructor. - */ - public KerberosHadoopFileSystemFactory() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public FileSystem getWithMappedName(String name) throws IOException { - reloginIfNeeded(); - - return super.getWithMappedName(name); - } - - /** {@inheritDoc} */ - @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { - UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, - UserGroupInformation.getLoginUser()); - - return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { - @Override public FileSystem run() throws Exception { - return FileSystem.get(fullUri, cfg); - } - }); - } - - /** - * Gets the key tab principal short name (e.g. "hdfs"). - * - * @return The key tab principal. - */ - @Nullable public String getKeyTabPrincipal() { - return keyTabPrincipal; - } - - /** - * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information. - * - * @param keyTabPrincipal The key tab principal name. - */ - public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) { - this.keyTabPrincipal = keyTabPrincipal; - } - - /** - * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab"). - * <p> - * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of - * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well. - * - * @return The key tab file name. - */ - @Nullable public String getKeyTab() { - return keyTab; - } - - /** - * Sets the key tab file name. See {@link #getKeyTab()} for more information. - * - * @param keyTab The key tab file name. - */ - public void setKeyTab(@Nullable String keyTab) { - this.keyTab = keyTab; - } - - /** - * The interval used to re-login from the key tab, in milliseconds. - * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is - * because the ticket renew window starts from {@code 0.8 * ticket life time}. - * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min) - * is obeys this rule well. - * - * <p>Zero value means that re-login should be attempted on each file system operation. - * Negative values are not allowed. - * - * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to - * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds - * have passed since the time of the previous login. - * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for - * more detail. - * - * @return The re-login interval, in milliseconds. - */ - public long getReloginInterval() { - return reloginInterval; - } - - /** - * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information. - * - * @param reloginInterval The re-login interval, in milliseconds. - */ - public void setReloginInterval(long reloginInterval) { - this.reloginInterval = reloginInterval; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty."); - A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty."); - A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative."); - - super.start(); - - try { - UserGroupInformation.setConfiguration(cfg); - UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab); - } - catch (IOException ioe) { - throw new IgniteException("Failed login from keytab [keyTab=" + keyTab + - ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe); - } - } - - /** - * Re-logins the user if needed. - * First, the re-login interval defined in factory is checked. The re-login attempts will be not more - * frequent than one attempt per {@code reloginInterval}. - * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing - * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. - * - * <p>This operation expected to be called upon each operation with the file system created with the factory. - * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there - * is no need to invoke it otherwise specially. - * - * @throws IOException If login fails. - */ - private void reloginIfNeeded() throws IOException { - long now = System.currentTimeMillis(); - - if (now >= lastReloginTime + reloginInterval) { - UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - - lastReloginTime = now; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - U.writeString(out, keyTab); - U.writeString(out, keyTabPrincipal); - out.writeLong(reloginInterval); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - keyTab = U.readString(in); - keyTabPrincipal = U.readString(in); - reloginInterval = in.readLong(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index a06129e..85fc76e 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -42,6 +42,8 @@ import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; @@ -58,7 +60,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; import java.io.BufferedOutputStream; @@ -165,7 +166,7 @@ public class IgniteHadoopFileSystem extends FileSystem { private IgfsModeResolver modeRslvr; /** The secondary file system factory. */ - private HadoopFileSystemFactory factory; + private HadoopFileSystemFactoryDelegate factory; /** Management connection flag. */ private boolean mgmt; @@ -332,7 +333,10 @@ public class IgniteHadoopFileSystem extends FileSystem { if (initSecondary) { try { - factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + HadoopFileSystemFactory factory0 = + (HadoopFileSystemFactory)paths.getPayload(getClass().getClassLoader()); + + factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0); } catch (IgniteCheckedException e) { throw new IOException("Failed to get secondary file system factory.", e); @@ -343,11 +347,10 @@ public class IgniteHadoopFileSystem extends FileSystem { IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " + FileSystemConfiguration.class.getName() + "?)"); - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).start(); + factory.start(); try { - FileSystem secFs = factory.get(user); + FileSystem secFs = (FileSystem)factory.get(user); secondaryUri = secFs.getUri(); @@ -423,8 +426,8 @@ public class IgniteHadoopFileSystem extends FileSystem { if (clientLog.isLogEnabled()) clientLog.close(); - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).stop(); + if (factory != null) + factory.stop(); // Reset initialized resources. uri = null; @@ -1359,6 +1362,6 @@ public class IgniteHadoopFileSystem extends FileSystem { if (factory == null) return null; - return factory.get(user); + return (FileSystem)factory.get(user); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index bd8ed2d..32e51df 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -46,6 +46,8 @@ import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; @@ -63,7 +65,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; import java.io.BufferedOutputStream; @@ -169,7 +170,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea private IgfsModeResolver modeRslvr; /** The secondary file system factory. */ - private HadoopFileSystemFactory factory; + private HadoopFileSystemFactoryDelegate factory; /** Whether custom sequential reads before prefetch value is provided. */ private boolean seqReadsBeforePrefetchOverride; @@ -341,7 +342,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (initSecondary) { try { - factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + HadoopFileSystemFactory factory0 = + (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + + factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0); } catch (IgniteCheckedException e) { throw new IOException("Failed to get secondary file system factory.", e); @@ -354,11 +358,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea assert factory != null; - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).start(); + factory.start(); try { - FileSystem secFs = factory.get(user); + FileSystem secFs = (FileSystem)factory.get(user); secondaryUri = secFs.getUri(); @@ -385,8 +388,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.close(); - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).stop(); + if (factory != null) + factory.stop(); // Reset initialized resources. rmtClient = null; @@ -1071,6 +1074,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea private FileSystem secondaryFileSystem() throws IOException{ assert factory != null; - return factory.get(user); + return (FileSystem)factory.get(user); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 83ccdf0..08c3cb5 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -25,12 +25,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutput; import java.io.ObjectOutputStream; import java.io.PrintStream; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java new file mode 100644 index 0000000..9e69914 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.util.UserNameMapper; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; + +/** + * Basic Hadoop file system factory delegate. + */ +public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { + /** Proxy. */ + protected final HadoopFileSystemFactory proxy; + + /** Configuration of the secondary filesystem, never null. */ + protected Configuration cfg; + + /** Resulting URI. */ + protected URI fullUri; + + /** User name mapper. */ + private UserNameMapper usrNameMapper; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) { + this.proxy = proxy; + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String name) throws IOException { + String name0 = IgfsUtils.fixUserName(name); + + if (usrNameMapper != null) + name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0)); + + return getWithMappedName(name0); + } + + /** + * Internal file system create routine. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + */ + protected FileSystem getWithMappedName(String usrName) throws IOException { + assert cfg != null; + + try { + // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. + // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context + // classloader to classloader of current class to avoid strange class-cast-exceptions. + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + return create(usrName); + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + + /** + * Internal file system creation routine, invoked in correct class loader context. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + * @throws InterruptedException if the current thread is interrupted. + */ + protected FileSystem create(String usrName) throws IOException, InterruptedException { + return FileSystem.get(fullUri, cfg, usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy; + + cfg = HadoopUtils.safeCreateConfiguration(); + + if (proxy0.getConfigPaths() != null) { + for (String cfgPath : proxy0.getConfigPaths()) { + if (cfgPath == null) + throw new NullPointerException("Configuration path cannot be null: " + + Arrays.toString(proxy0.getConfigPaths())); + else { + URL url = U.resolveIgniteUrl(cfgPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IgniteException("Failed to resolve secondary file system configuration path " + + "(ensure that it exists locally and you have read access to it): " + cfgPath); + } + + cfg.addResource(url); + } + } + } + + // If secondary fs URI is not given explicitly, try to get it from the configuration: + if (proxy0.getUri() == null) + fullUri = FileSystem.getDefaultUri(cfg); + else { + try { + fullUri = new URI(proxy0.getUri()); + } + catch (URISyntaxException use) { + throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri()); + } + } + + usrNameMapper = proxy0.getUserNameMapper(); + + if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) + ((LifecycleAware)usrNameMapper).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) + ((LifecycleAware)usrNameMapper).stop(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java new file mode 100644 index 0000000..04bbeb8 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; + +import java.io.IOException; + +/** + * Caching Hadoop file system factory delegate. + */ +public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { + /** Per-user file system cache. */ + private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) throws IOException { + return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key); + } + } + ); + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) { + super(proxy); + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + return cache.getOrCreate(name); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + super.start(); + + // Disable caching. + cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + super.stop(); + + try { + cache.close(); + } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java new file mode 100644 index 0000000..3eb6239 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; + +/** + * Hadoop file system factory delegate for non-standard factories. + */ +public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { + /** Factory. */ + private final HadoopFileSystemFactory factory; + + /** + * Constructor. + * + * @param factory Factory. + */ + public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) { + assert factory != null; + + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String usrName) throws IOException { + return (FileSystem)factory.get(usrName); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (factory instanceof LifecycleAware) + ((LifecycleAware)factory).start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (factory instanceof LifecycleAware) + ((LifecycleAware)factory).stop(); + } +}