Repository: ignite Updated Branches: refs/heads/ignite-1.6.8-hadoop cb304b145 -> 41de3ab57
http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java new file mode 100644 index 0000000..bf93f37 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsParentNotDirectoryException; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; +import org.apache.ignite.igfs.IgfsPathNotFoundException; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; +import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Secondary file system implementation. + */ +@SuppressWarnings("unused") +public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate { + /** The default user name. It is used if no user context is set. */ + private final String dfltUsrName; + + /** Factory. */ + private final HadoopFileSystemFactoryDelegate factory; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + @SuppressWarnings("unused") + public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) { + assert proxy.getFileSystemFactory() != null; + + dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName()); + + HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory(); + + if (factory0 == null) + factory0 = new CachingHadoopFileSystemFactory(); + + factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + return fileSystemForUser().exists(convert(path)); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + + final FileSystem fileSys = fileSystemForUser(); + + try { + if (props0.userName() != null || props0.groupName() != null) + fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + + if (props0.permission() != null) + fileSys.setPermission(convert(path), props0.permission()); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); + } + + //Result is not used in case of secondary FS. + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // Delegate to the secondary file system. + try { + if (!fileSystemForUser().rename(convert(src), convert(dest))) + throw new IgfsException("Failed to rename (secondary file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + try { + return fileSystemForUser().delete(convert(path), recursive); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + try { + if (!fileSystemForUser().mkdirs(convert(path))) + throw new IgniteException("Failed to make directories [path=" + path + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + try { + if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsPath> res = new ArrayList<>(statuses.length); + + for (FileStatus status : statuses) + res.add(new IgfsPath(path, status.getPath().getName())); + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + try { + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); + + if (statuses == null) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + + Collection<IgfsFile> res = new ArrayList<>(statuses.length); + + for (FileStatus s : statuses) { + IgfsEntryInfo fsInfo = s.isDirectory() ? + IgfsUtils.createDirectory( + IgniteUuid.randomUuid(), + null, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ) : + IgfsUtils.createFile( + IgniteUuid.randomUuid(), + (int)s.getBlockSize(), + s.getLen(), + null, + null, + false, + properties(s), + s.getAccessTime(), + s.getModificationTime() + ); + + res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); + } + + return res; + } + catch (FileNotFoundException ignored) { + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + try { + return fileSystemForUser().create(convert(path), overwrite); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + HadoopIgfsProperties props0 = + new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); + + try { + return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short) replication, blockSize, null); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + + ", blockSize=" + blockSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + return fileSystemForUser().append(convert(path), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + try { + final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); + + if (status == null) + return null; + + final Map<String, String> props = properties(status); + + return new IgfsFile() { + @Override public IgfsPath path() { + return path; + } + + @Override public boolean isFile() { + return status.isFile(); + } + + @Override public boolean isDirectory() { + return status.isDirectory(); + } + + @Override public int blockSize() { + // By convention directory has blockSize == 0, while file has blockSize > 0: + return isDirectory() ? 0 : (int)status.getBlockSize(); + } + + @Override public long groupBlockSize() { + return status.getBlockSize(); + } + + @Override public long accessTime() { + return status.getAccessTime(); + } + + @Override public long modificationTime() { + return status.getModificationTime(); + } + + @Override public String property(String name) throws IllegalArgumentException { + String val = props.get(name); + + if (val == null) + throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); + + return val; + } + + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + String val = props.get(name); + + return val == null ? dfltVal : val; + } + + @Override public long length() { + return status.getLen(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props; + } + }; + } + catch (FileNotFoundException ignore) { + return null; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); + } + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to get used space size of file system."); + } + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + try { + // We don't use FileSystem#getUsed() since it counts only the files + // in the filesystem root, not all the files recursively. + fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed set times for path: " + path); + } + } + + /** {@inheritDoc} */ + public void start() { + factory.start(); + } + + /** {@inheritDoc} */ + public void stop() { + factory.stop(); + } + + /** + * Convert IGFS path into Hadoop path. + * + * @param path IGFS path. + * @return Hadoop path. + */ + private Path convert(IgfsPath path) { + URI uri = fileSystemForUser().getUri(); + + return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); + } + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param detailMsg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { + return cast(detailMsg, e); + } + + /** + * Cast IO exception to IGFS exception. + * + * @param e IO exception. + * @return IGFS exception. + */ + public static IgfsException cast(String msg, IOException e) { + if (e instanceof FileNotFoundException) + return new IgfsPathNotFoundException(e); + else if (e instanceof ParentNotDirectoryException) + return new IgfsParentNotDirectoryException(msg, e); + else if (e instanceof PathIsNotEmptyDirectoryException) + return new IgfsDirectoryNotEmptyException(e); + else if (e instanceof PathExistsException) + return new IgfsPathAlreadyExistsException(msg, e); + else + return new IgfsException(msg, e); + } + + /** + * Convert Hadoop FileStatus properties to map. + * + * @param status File status. + * @return IGFS attributes. + */ + private static Map<String, String> properties(FileStatus status) { + FsPermission perm = status.getPermission(); + + if (perm == null) + perm = FsPermission.getDefault(); + + HashMap<String, String> res = new HashMap<>(3); + + res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); + res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); + res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); + + return res; + } + + /** + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. + */ + private FileSystem fileSystemForUser() { + String user = IgfsUserContext.currentUser(); + + if (F.isEmpty(user)) + user = IgfsUtils.fixUserName(dfltUsrName); + + assert !F.isEmpty(user); + + try { + return (FileSystem)factory.get(user); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java new file mode 100644 index 0000000..19c470e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * Kerberos Hadoop file system factory delegate. + */ +public class HadoopKerberosFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { + /** The re-login interval. */ + private long reloginInterval; + + /** Time of last re-login attempt, in system milliseconds. */ + private volatile long lastReloginTime; + + /** + * Constructor. + * + * @param proxy Proxy. + */ + public HadoopKerberosFileSystemFactoryDelegate(KerberosHadoopFileSystemFactory proxy) { + super(proxy); + } + + /** {@inheritDoc} */ + @Override public FileSystem getWithMappedName(String name) throws IOException { + reloginIfNeeded(); + + return super.getWithMappedName(name); + } + + /** {@inheritDoc} */ + @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { + UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, + UserGroupInformation.getLoginUser()); + + return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override public FileSystem run() throws Exception { + return FileSystem.get(fullUri, cfg); + } + }); + } + + @Override public void start() throws IgniteException { + super.start(); + + KerberosHadoopFileSystemFactory proxy0 = (KerberosHadoopFileSystemFactory)proxy; + + A.ensure(!F.isEmpty(proxy0.getKeyTab()), "keyTab cannot not be empty."); + A.ensure(!F.isEmpty(proxy0.getKeyTabPrincipal()), "keyTabPrincipal cannot not be empty."); + A.ensure(proxy0.getReloginInterval() >= 0, "reloginInterval cannot not be negative."); + + reloginInterval = proxy0.getReloginInterval(); + + try { + UserGroupInformation.setConfiguration(cfg); + UserGroupInformation.loginUserFromKeytab(proxy0.getKeyTabPrincipal(), proxy0.getKeyTab()); + } + catch (IOException ioe) { + throw new IgniteException("Failed login from keytab [keyTab=" + proxy0.getKeyTab() + + ", keyTabPrincipal=" + proxy0.getKeyTabPrincipal() + ']', ioe); + } + } + + /** + * Re-logins the user if needed. + * First, the re-login interval defined in factory is checked. The re-login attempts will be not more + * frequent than one attempt per {@code reloginInterval}. + * Second, {@code UserGroupInformation.checkTGTAndReloginFromKeytab()} method invoked that gets existing + * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. + * + * <p>This operation expected to be called upon each operation with the file system created with the factory. + * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there + * is no need to invoke it otherwise specially. + * + * @throws IOException If login fails. + */ + private void reloginIfNeeded() throws IOException { + long now = System.currentTimeMillis(); + + if (now >= lastReloginTime + reloginInterval) { + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + + lastReloginTime = now; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java index ea7fa99..6036e3a 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java @@ -25,6 +25,9 @@ import java.io.ObjectOutput; import java.io.ObjectOutputStream; import java.util.concurrent.Callable; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; @@ -67,7 +70,9 @@ public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractT GridTestUtils.assertThrows(null, new Callable<Object>() { @Override public Object call() throws Exception { - fac.start(); + HadoopFileSystemFactoryDelegate delegate = HadoopDelegateUtils.fileSystemFactoryDelegate(fac); + + delegate.start(); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java index 5be3a64..a4e818f 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java @@ -26,20 +26,24 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.jetbrains.annotations.Nullable; -import java.io.Externalizable; import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; import java.net.URI; import java.util.concurrent.atomic.AtomicInteger; @@ -176,19 +180,22 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { writeConfigurationToFile(conf); - // Configure factory. - TestFactory factory = new TestFactory(); + // Get file system instance to be used. + CachingHadoopFileSystemFactory delegate = new CachingHadoopFileSystemFactory(); + + delegate.setUri("igfs://secondary:secondary@127.0.0.1:11500/"); + delegate.setConfigPaths(SECONDARY_CFG_PATH); - factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/"); - factory.setConfigPaths(SECONDARY_CFG_PATH); + // Configure factory. + TestFactory factory = new TestFactory(delegate); // Configure file system. - IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem(); + IgniteHadoopIgfsSecondaryFileSystem secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(); - fs.setFileSystemFactory(factory); + secondaryFs.setFileSystemFactory(factory); // Start. - return start("primary", 10500, IgfsMode.DUAL_ASYNC, fs); + return start("primary", 10500, IgfsMode.DUAL_ASYNC, secondaryFs); } /** @@ -292,26 +299,42 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { /** * Test factory. */ - private static class TestFactory extends CachingHadoopFileSystemFactory { + private static class TestFactory implements HadoopFileSystemFactory, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** File system factory. */ + private CachingHadoopFileSystemFactory factory; + + /** File system. */ + private transient HadoopFileSystemFactoryDelegate delegate; + /** - * {@link Externalizable} support. + * Constructor. + * + * @param factory File system factory. */ - public TestFactory() { - // No-op. + public TestFactory(CachingHadoopFileSystemFactory factory) { + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public Object get(String usrName) throws IOException { + return delegate.get(usrName); } /** {@inheritDoc} */ @Override public void start() throws IgniteException { - START_CNT.incrementAndGet(); + delegate = HadoopDelegateUtils.fileSystemFactoryDelegate(factory); - super.start(); + delegate.start(); + + START_CNT.incrementAndGet(); } /** {@inheritDoc} */ @Override public void stop() throws IgniteException { STOP_CNT.incrementAndGet(); - - super.stop(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java index f7af6f0..8e7cdb9 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsUtils; @@ -39,7 +41,7 @@ import org.apache.ignite.internal.util.typedef.T2; */ public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFileSystemTestAdapter { /** File system factory. */ - private final HadoopFileSystemFactory factory; + private final HadoopFileSystemFactoryDelegate factory; /** * Constructor. @@ -48,7 +50,9 @@ public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFi public HadoopIgfsSecondaryFileSystemTestAdapter(HadoopFileSystemFactory factory) { assert factory != null; - this.factory = factory; + this.factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory); + + this.factory.start(); } /** {@inheritDoc} */ @@ -144,6 +148,6 @@ public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFi * @throws IOException If failed. */ protected FileSystem get() throws IOException { - return factory.get(FileSystemConfiguration.DFLT_USER_NAME); + return (FileSystem)factory.get(FileSystemConfiguration.DFLT_USER_NAME); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index d9b5d66..ef98d49 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -31,6 +31,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils; import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; import org.apache.ignite.internal.util.typedef.G; @@ -161,7 +163,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra /** * Executes before each test. - * @throws Exception + * @throws Exception If failed. */ private void before() throws Exception { initSecondary(); @@ -179,16 +181,18 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra fac.setConfigPaths(primaryConfFullPath); fac.setUri(primaryFsUriStr); - fac.start(); + HadoopFileSystemFactoryDelegate facDelegate = HadoopDelegateUtils.fileSystemFactoryDelegate(fac); + + facDelegate.start(); - primaryFs = fac.get(null); //provider.createFileSystem(null); + primaryFs = (FileSystem)facDelegate.get(null); //provider.createFileSystem(null); primaryFsUri = primaryFs.getUri(); } /** * Executes after each test. - * @throws Exception + * @throws Exception If failed. */ private void after() throws Exception { if (primaryFs != null) { @@ -225,7 +229,7 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra /** * Initialize underlying secondary filesystem. * - * @throws Exception + * @throws Exception If failed. */ private void initSecondary() throws Exception { if (passSecondaryConfiguration) { http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java new file mode 100644 index 0000000..89b8028 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.hadoop.util.KerberosUserNameMapper; +import org.apache.ignite.hadoop.util.UserNameMapper; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call. + * <p> + * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop. + */ +public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** File system URI. */ + private String uri; + + /** File system config paths. */ + private String[] cfgPaths; + + /** User name mapper. */ + private UserNameMapper usrNameMapper; + + /** + * Constructor. + */ + public BasicHadoopFileSystemFactory() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public final Object get(String name) throws IOException { + throw new UnsupportedOperationException("Method should not be called directly."); + } + + /** + * Gets file system URI. + * <p> + * This URI will be used as a first argument when calling {@code FileSystem.get(URI, Configuration, String)}. + * <p> + * If not set, default URI will be picked from file system configuration using + * {@code FileSystem.getDefaultUri(Configuration)} method. + * + * @return File system URI. + */ + @Nullable public String getUri() { + return uri; + } + + /** + * Sets file system URI. See {@link #getUri()} for more information. + * + * @param uri File system URI. + */ + public void setUri(@Nullable String uri) { + this.uri = uri; + } + + /** + * Gets paths to additional file system configuration files (e.g. core-site.xml). + * <p> + * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable. + * <p> + * All provided paths will be loaded in the order they provided and then applied to {@code Configuration}. It means + * that path order might be important in some cases. + * + * @return Paths to file system configuration files. + */ + @Nullable public String[] getConfigPaths() { + return cfgPaths; + } + + /** + * Set paths to additional file system configuration files (e.g. core-site.xml). See {@link #getConfigPaths()} for + * more information. + * + * @param cfgPaths Paths to file system configuration files. + */ + public void setConfigPaths(@Nullable String... cfgPaths) { + this.cfgPaths = cfgPaths; + } + + /** + * Get optional user name mapper. + * <p> + * When IGFS is invoked from Hadoop, user name is passed along the way to ensure that request will be performed + * with proper user context. User name is passed in a simple form and doesn't contain any extended information, + * such as host, domain or Kerberos realm. You may use name mapper to translate plain user name to full user + * name required by security engine of the underlying file system. + * <p> + * For example you may want to use {@link KerberosUserNameMapper} to user name from {@code "johndoe"} to + * {@code "john...@your.realm.com"}. + * + * @return User name mapper. + */ + @Nullable public UserNameMapper getUserNameMapper() { + return usrNameMapper; + } + + /** + * Set optional user name mapper. See {@link #getUserNameMapper()} for more information. + * + * @param usrNameMapper User name mapper. + */ + public void setUserNameMapper(@Nullable UserNameMapper usrNameMapper) { + this.usrNameMapper = usrNameMapper; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, uri); + + if (cfgPaths != null) { + out.writeInt(cfgPaths.length); + + for (String cfgPath : cfgPaths) + U.writeString(out, cfgPath); + } + else + out.writeInt(-1); + + out.writeObject(usrNameMapper); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + uri = U.readString(in); + + int cfgPathsCnt = in.readInt(); + + if (cfgPathsCnt != -1) { + cfgPaths = new String[cfgPathsCnt]; + + for (int i = 0; i < cfgPathsCnt; i++) + cfgPaths[i] = U.readString(in); + } + + usrNameMapper = (UserNameMapper)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java new file mode 100644 index 0000000..b90777c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +/** + * Caching Hadoop file system factory. Caches {@code FileSystem} instances on per-user basis. Doesn't rely on + * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each + * user instead. + * <p> + * This makes cache instance resistant to concurrent calls to {@code FileSystem.close()} in other parts of the user + * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to + * {@code FileSystem.get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation + * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either + * use {@link BasicHadoopFileSystemFactory} or implement your own factory. + */ +public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + */ + public CachingHadoopFileSystemFactory() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java new file mode 100644 index 0000000..214328f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Factory for Hadoop {@code FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}. + * <p> + * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required. + * <p> + * It is implementation dependent whether to rely on built-in Hadoop file system cache, implement own caching facility + * or doesn't cache file systems at all. + * <p> + * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be + * performed by Ignite. You may want to implement some initialization or cleanup there. + */ +public interface HadoopFileSystemFactory extends Serializable { + /** + * Gets file system for the given user name. + * + * @param usrName User name + * @return File system. + * @throws IOException In case of error. + */ + public Object get(String usrName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java new file mode 100644 index 0000000..5fb078a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; +import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate; +import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.OutputStream; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * Secondary file system which delegates calls to Hadoop {@code org.apache.hadoop.fs.FileSystem}. + * <p> + * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. + */ +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware, + HadoopPayloadAware { + /** The default user name. It is used if no user context is set. */ + private String dfltUsrName; + + /** Factory. */ + private HadoopFileSystemFactory factory; + + /** Target. */ + volatile private HadoopIgfsSecondaryFileSystemDelegate target; + + /** + * Default constructor for Spring. + */ + public IgniteHadoopIgfsSecondaryFileSystem() { + // No-op. + } + + /** + * Simple constructor that is to be used by default. + * + * @param uri URI of file system. + * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. + */ + @Deprecated + public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { + this(uri, null, null); + } + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. + */ + @Deprecated + public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) + throws IgniteCheckedException { + this(uri, cfgPath, null); + } + + /** + * Constructor. + * + * @param uri URI of file system. + * @param cfgPath Additional path to Hadoop configuration. + * @param userName User name. + * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. + */ + @Deprecated + public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, + @Nullable String userName) throws IgniteCheckedException { + setDefaultUserName(userName); + + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); + + fac.setUri(uri); + + if (cfgPath != null) + fac.setConfigPaths(cfgPath); + + setFileSystemFactory(fac); + } + + /** + * Gets default user name. + * <p> + * Defines user name which will be used during file system invocation in case no user name is defined explicitly + * through {@code FileSystem.get(URI, Configuration, String)}. + * <p> + * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name + * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or + * {@link IgfsUserContext#doAs(String, Callable)} methods. + * <p> + * If not set value of system property {@code "user.name"} will be used. If this property is not set either, + * {@code "anonymous"} will be used. + * + * @return Default user name. + */ + @Nullable public String getDefaultUserName() { + return dfltUsrName; + } + + /** + * Sets default user name. See {@link #getDefaultUserName()} for details. + * + * @param dfltUsrName Default user name. + */ + public void setDefaultUserName(@Nullable String dfltUsrName) { + this.dfltUsrName = dfltUsrName; + } + + /** + * Gets secondary file system factory. + * <p> + * This factory will be used whenever a call to a target {@code FileSystem} is required. + * <p> + * If not set, {@link CachingHadoopFileSystemFactory} will be used. + * + * @return Secondary file system factory. + */ + public HadoopFileSystemFactory getFileSystemFactory() { + return factory; + } + + /** + * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details. + * + * @param factory Secondary file system factory. + */ + public void setFileSystemFactory(HadoopFileSystemFactory factory) { + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return target.exists(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + return target.update(path, props); + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + target.rename(src, dest); + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + return target.delete(path, recursive); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + target.mkdirs(path); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + target.mkdirs(path, props); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + return target.listPaths(path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + return target.listFiles(path); + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + return target.open(path, bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + return target.create(path, overwrite); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + return target.create(path, bufSize, overwrite, replication, blockSize, props); + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + return target.append(path, bufSize, create, props); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + return target.info(path); + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + return target.usedSpaceSize(); + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + target.setTimes(path, accessTime, modificationTime); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + target = HadoopDelegateUtils.secondaryFileSystemDelegate(this); + + target.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (target != null) + target.stop(); + } + + /** {@inheritDoc} */ + @Override public HadoopFileSystemFactory getPayload() { + return factory; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java new file mode 100644 index 0000000..46d626b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos. + * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user. + * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details. + * The principal and the key tab name to be used for Kerberos authentication are set explicitly + * in the factory configuration. + * + * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set + * to {@code true}, file system instances will be cached by Hadoop. + */ +public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** The default interval used to re-login from the key tab, in milliseconds. */ + public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L; + + /** Keytab full file name. */ + private String keyTab; + + /** Keytab principal. */ + private String keyTabPrincipal; + + /** The re-login interval. See {@link #getReloginInterval()} for more information. */ + private long reloginInterval = DFLT_RELOGIN_INTERVAL; + + /** + * Constructor. + */ + public KerberosHadoopFileSystemFactory() { + // No-op. + } + + /** + * Gets the key tab principal short name (e.g. "hdfs"). + * + * @return The key tab principal. + */ + @Nullable public String getKeyTabPrincipal() { + return keyTabPrincipal; + } + + /** + * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information. + * + * @param keyTabPrincipal The key tab principal name. + */ + public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) { + this.keyTabPrincipal = keyTabPrincipal; + } + + /** + * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab"). + * + * @return The key tab file name. + */ + @Nullable public String getKeyTab() { + return keyTab; + } + + /** + * Sets the key tab file name. See {@link #getKeyTab()} for more information. + * + * @param keyTab The key tab file name. + */ + public void setKeyTab(@Nullable String keyTab) { + this.keyTab = keyTab; + } + + /** + * The interval used to re-login from the key tab, in milliseconds. + * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is + * because the ticket renew window starts from {@code 0.8 * ticket life time}. + * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min) + * is obeys this rule well. + * + * <p>Zero value means that re-login should be attempted on each file system operation. + * Negative values are not allowed. + * + * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to + * login if less than {@code org.apache.hadoop.security.UserGroupInformation.MIN_TIME_BEFORE_RELOGIN} milliseconds + * have passed since the time of the previous login. + * + * @return The re-login interval, in milliseconds. + */ + public long getReloginInterval() { + return reloginInterval; + } + + /** + * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information. + * + * @param reloginInterval The re-login interval, in milliseconds. + */ + public void setReloginInterval(long reloginInterval) { + this.reloginInterval = reloginInterval; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + U.writeString(out, keyTab); + U.writeString(out, keyTabPrincipal); + out.writeLong(reloginInterval); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keyTab = U.readString(in); + keyTabPrincipal = U.readString(in); + reloginInterval = in.readLong(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java new file mode 100644 index 0000000..57c8138 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; + +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility methods for Hadoop delegates. + */ +public class HadoopDelegateUtils { + /** Secondary file system delegate class. */ + private static final String SECONDARY_FILE_SYSTEM_CLS = + "org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegateImpl"; + + /** Default file system factory class. */ + private static final String DFLT_FACTORY_CLS = + "org.apache.ignite.internal.processors.hadoop.delegate.HadoopDefaultFileSystemFactoryDelegate"; + + /** Factory proxy to delegate class name mapping. */ + private static final Map<String, String> FACTORY_CLS_MAP; + + static { + FACTORY_CLS_MAP = new HashMap<>(); + + FACTORY_CLS_MAP.put(BasicHadoopFileSystemFactory.class.getName(), + "org.apache.ignite.internal.processors.hadoop.delegate.HadoopBasicFileSystemFactoryDelegate"); + + FACTORY_CLS_MAP.put(CachingHadoopFileSystemFactory.class.getName(), + "org.apache.ignite.internal.processors.hadoop.delegate.HadoopCachingFileSystemFactoryDelegate"); + + FACTORY_CLS_MAP.put(KerberosHadoopFileSystemFactory.class.getName(), + "org.apache.ignite.internal.processors.hadoop.delegate.HadoopKerberosFileSystemFactoryDelegate"); + } + + /** + * Create delegate for secondary file system. + * + * @param proxy Proxy. + * @return Delegate. + */ + public static HadoopIgfsSecondaryFileSystemDelegate secondaryFileSystemDelegate( + IgniteHadoopIgfsSecondaryFileSystem proxy) { + return newInstance(SECONDARY_FILE_SYSTEM_CLS, proxy); + } + + /** + * Create delegate for certain file system factory. + * + * @param proxy Proxy. + * @return Delegate. + */ + @SuppressWarnings("unchecked") + public static HadoopFileSystemFactoryDelegate fileSystemFactoryDelegate(Object proxy) { + String clsName = FACTORY_CLS_MAP.get(proxy.getClass().getName()); + + if (clsName == null) + clsName = DFLT_FACTORY_CLS; + + return newInstance(clsName, proxy); + } + + /** + * Get new delegate instance. + * + * @param clsName Class name. + * @param proxy Proxy. + * @return Instance. + */ + @SuppressWarnings("unchecked") + private static <T> T newInstance(String clsName, Object proxy) { + try { + Class delegateCls = Class.forName(clsName); + + Constructor[] ctors = delegateCls.getConstructors(); + + assert ctors.length == 1; + + Object res = ctors[0].newInstance(proxy); + + return (T)res; + } + catch (ReflectiveOperationException e) { + throw new IgniteException("Failed to instantiate delegate for proxy [proxy=" + proxy + + ", delegateClsName=" + clsName + ']', e); + } + } + + /** + * Private constructor. + */ + private HadoopDelegateUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java new file mode 100644 index 0000000..f051d62 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; + +/** + * Hadoop file system factory delegate. + */ +public interface HadoopFileSystemFactoryDelegate extends LifecycleAware { + /** + * Gets file system for the given user name. + * + * @param usrName User name + * @return File system. + * @throws IOException In case of error. + */ + public Object get(String usrName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/41de3ab5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java new file mode 100644 index 0000000..e381272 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.delegate; + +import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2; +import org.apache.ignite.lifecycle.LifecycleAware; + +/** + * Interface to secondary file system implementation. + */ +public interface HadoopIgfsSecondaryFileSystemDelegate extends IgfsSecondaryFileSystemV2, LifecycleAware { + // No-op. +}