http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java deleted file mode 100644 index 6b5c776..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; -import org.apache.ignite.igfs.IgfsException; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsParentNotDirectoryException; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; -import org.apache.ignite.igfs.IgfsPathNotFoundException; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; -import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; -import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; - -/** - * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}. - * <p> - * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. - */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware, - HadoopPayloadAware { - /** The default user name. It is used if no user context is set. */ - private String dfltUsrName; - - /** Factory. */ - private HadoopFileSystemFactory fsFactory; - - /** - * Default constructor for Spring. - */ - public IgniteHadoopIgfsSecondaryFileSystem() { - // No-op. - } - - /** - * Simple constructor that is to be used by default. - * - * @param uri URI of file system. - * @throws IgniteCheckedException In case of error. - * @deprecated Use {@link #getFileSystemFactory()} instead. - */ - @Deprecated - public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { - this(uri, null, null); - } - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @throws IgniteCheckedException In case of error. - * @deprecated Use {@link #getFileSystemFactory()} instead. - */ - @Deprecated - public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) - throws IgniteCheckedException { - this(uri, cfgPath, null); - } - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @param userName User name. - * @throws IgniteCheckedException In case of error. - * @deprecated Use {@link #getFileSystemFactory()} instead. - */ - @Deprecated - public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, - @Nullable String userName) throws IgniteCheckedException { - setDefaultUserName(userName); - - CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); - - fac.setUri(uri); - - if (cfgPath != null) - fac.setConfigPaths(cfgPath); - - setFileSystemFactory(fac); - } - - /** - * Gets default user name. - * <p> - * Defines user name which will be used during file system invocation in case no user name is defined explicitly - * through {@link FileSystem#get(URI, Configuration, String)}. - * <p> - * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name - * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or - * {@link IgfsUserContext#doAs(String, Callable)} methods. - * <p> - * If not set value of system property {@code "user.name"} will be used. If this property is not set either, - * {@code "anonymous"} will be used. - * - * @return Default user name. - */ - @Nullable public String getDefaultUserName() { - return dfltUsrName; - } - - /** - * Sets default user name. See {@link #getDefaultUserName()} for details. - * - * @param dfltUsrName Default user name. - */ - public void setDefaultUserName(@Nullable String dfltUsrName) { - this.dfltUsrName = dfltUsrName; - } - - /** - * Gets secondary file system factory. - * <p> - * This factory will be used whenever a call to a target {@link FileSystem} is required. - * <p> - * If not set, {@link CachingHadoopFileSystemFactory} will be used. - * - * @return Secondary file system factory. - */ - public HadoopFileSystemFactory getFileSystemFactory() { - return fsFactory; - } - - /** - * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details. - * - * @param factory Secondary file system factory. - */ - public void setFileSystemFactory(HadoopFileSystemFactory factory) { - this.fsFactory = factory; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - URI uri = fileSystemForUser().getUri(); - - return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); - } - - /** - * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. - * - * @param e Exception to check. - * @param detailMsg Detailed error message. - * @return Appropriate exception. - */ - private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - return cast(detailMsg, e); - } - - /** - * Cast IO exception to IGFS exception. - * - * @param e IO exception. - * @return IGFS exception. - */ - public static IgfsException cast(String msg, IOException e) { - if (e instanceof FileNotFoundException) - return new IgfsPathNotFoundException(e); - else if (e instanceof ParentNotDirectoryException) - return new IgfsParentNotDirectoryException(msg, e); - else if (e instanceof PathIsNotEmptyDirectoryException) - return new IgfsDirectoryNotEmptyException(e); - else if (e instanceof PathExistsException) - return new IgfsPathAlreadyExistsException(msg, e); - else - return new IgfsException(msg, e); - } - - /** - * Convert Hadoop FileStatus properties to map. - * - * @param status File status. - * @return IGFS attributes. - */ - private static Map<String, String> properties(FileStatus status) { - FsPermission perm = status.getPermission(); - - if (perm == null) - perm = FsPermission.getDefault(); - - HashMap<String, String> res = new HashMap<>(3); - - res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); - res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); - res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - try { - return fileSystemForUser().exists(convert(path)); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); - - final FileSystem fileSys = fileSystemForUser(); - - try { - if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); - - if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); - } - - //Result is not used in case of secondary FS. - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // Delegate to the secondary file system. - try { - if (!fileSystemForUser().rename(convert(src), convert(dest))) - throw new IgfsException("Failed to rename (secondary file system returned false) " + - "[src=" + src + ", dest=" + dest + ']'); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - try { - return fileSystemForUser().delete(convert(path), recursive); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - try { - if (!fileSystemForUser().mkdirs(convert(path))) - throw new IgniteException("Failed to make directories [path=" + path + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { - try { - if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) - throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) { - try { - FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); - - if (statuses == null) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsPath> res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) - res.add(new IgfsPath(path, status.getPath().getName())); - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) { - try { - FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); - - if (statuses == null) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsFile> res = new ArrayList<>(statuses.length); - - for (FileStatus s : statuses) { - IgfsEntryInfo fsInfo = s.isDirectory() ? - IgfsUtils.createDirectory( - IgniteUuid.randomUuid(), - null, - properties(s), - s.getAccessTime(), - s.getModificationTime() - ) : - IgfsUtils.createFile( - IgniteUuid.randomUuid(), - (int)s.getBlockSize(), - s.getLen(), - null, - null, - false, - properties(s), - s.getAccessTime(), - s.getModificationTime() - ); - - res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); - } - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, boolean overwrite) { - try { - return fileSystemForUser().create(convert(path), overwrite); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) { - HadoopIgfsProperties props0 = - new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); - - try { - return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, - (short) replication, blockSize, null); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + - ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + - ", blockSize=" + blockSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) { - try { - return fileSystemForUser().append(convert(path), bufSize); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) { - try { - final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); - - if (status == null) - return null; - - final Map<String, String> props = properties(status); - - return new IgfsFile() { - @Override public IgfsPath path() { - return path; - } - - @Override public boolean isFile() { - return status.isFile(); - } - - @Override public boolean isDirectory() { - return status.isDirectory(); - } - - @Override public int blockSize() { - // By convention directory has blockSize == 0, while file has blockSize > 0: - return isDirectory() ? 0 : (int)status.getBlockSize(); - } - - @Override public long groupBlockSize() { - return status.getBlockSize(); - } - - @Override public long accessTime() { - return status.getAccessTime(); - } - - @Override public long modificationTime() { - return status.getModificationTime(); - } - - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - @Nullable @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - @Override public long length() { - return status.getLen(); - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return props; - } - }; - } - catch (FileNotFoundException ignore) { - return null; - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - try { - // We don't use FileSystem#getUsed() since it counts only the files - // in the filesystem root, not all the files recursively. - return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get used space size of file system."); - } - } - - /** {@inheritDoc} */ - @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { - try { - // We don't use FileSystem#getUsed() since it counts only the files - // in the filesystem root, not all the files recursively. - fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed set times for path: " + path); - } - } - - /** - * Gets the underlying {@link FileSystem}. - * This method is used solely for testing. - * @return the underlying Hadoop {@link FileSystem}. - */ - public FileSystem fileSystem() { - return fileSystemForUser(); - } - - /** - * Gets the FileSystem for the current context user. - * @return the FileSystem instance, never null. - */ - private FileSystem fileSystemForUser() { - String user = IgfsUserContext.currentUser(); - - if (F.isEmpty(user)) - user = IgfsUtils.fixUserName(dfltUsrName); - - assert !F.isEmpty(user); - - try { - return fsFactory.get(user); - } - catch (IOException ioe) { - throw new IgniteException(ioe); - } - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - dfltUsrName = IgfsUtils.fixUserName(dfltUsrName); - - if (fsFactory == null) - fsFactory = new CachingHadoopFileSystemFactory(); - - if (fsFactory instanceof LifecycleAware) - ((LifecycleAware) fsFactory).start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - if (fsFactory instanceof LifecycleAware) - ((LifecycleAware)fsFactory).stop(); - } - - /** {@inheritDoc} */ - @Override public HadoopFileSystemFactory getPayload() { - return fsFactory; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java deleted file mode 100644 index bbfbc59..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.security.PrivilegedExceptionAction; - -/** - * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos. - * It uses "impersonation" mechanism, to be able to work on behalf of arbitrary client user. - * Please see https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html for details. - * The principal and the key tab name to be used for Kerberos authentication are set explicitly - * in the factory configuration. - * - * <p>This factory does not cache any file system instances. If {@code "fs.[prefix].impl.disable.cache"} is set - * to {@code true}, file system instances will be cached by Hadoop. - */ -public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { - /** */ - private static final long serialVersionUID = 0L; - - /** The default interval used to re-login from the key tab, in milliseconds. */ - public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L; - - /** Keytab full file name. */ - private String keyTab; - - /** Keytab principal. */ - private String keyTabPrincipal; - - /** The re-login interval. See {@link #getReloginInterval()} for more information. */ - private long reloginInterval = DFLT_RELOGIN_INTERVAL; - - /** Time of last re-login attempt, in system milliseconds. */ - private transient volatile long lastReloginTime; - - /** - * Constructor. - */ - public KerberosHadoopFileSystemFactory() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public FileSystem getWithMappedName(String name) throws IOException { - reloginIfNeeded(); - - return super.getWithMappedName(name); - } - - /** {@inheritDoc} */ - @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { - UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, - UserGroupInformation.getLoginUser()); - - return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { - @Override public FileSystem run() throws Exception { - return FileSystem.get(fullUri, cfg); - } - }); - } - - /** - * Gets the key tab principal short name (e.g. "hdfs"). - * - * @return The key tab principal. - */ - @Nullable public String getKeyTabPrincipal() { - return keyTabPrincipal; - } - - /** - * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for more information. - * - * @param keyTabPrincipal The key tab principal name. - */ - public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) { - this.keyTabPrincipal = keyTabPrincipal; - } - - /** - * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab"). - * <p> - * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of - * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well. - * - * @return The key tab file name. - */ - @Nullable public String getKeyTab() { - return keyTab; - } - - /** - * Sets the key tab file name. See {@link #getKeyTab()} for more information. - * - * @param keyTab The key tab file name. - */ - public void setKeyTab(@Nullable String keyTab) { - this.keyTab = keyTab; - } - - /** - * The interval used to re-login from the key tab, in milliseconds. - * Important that the value should not be larger than the Kerberos ticket life time multiplied by 0.2. This is - * because the ticket renew window starts from {@code 0.8 * ticket life time}. - * Default ticket life time is 1 day (24 hours), so the default re-login interval (10 min) - * is obeys this rule well. - * - * <p>Zero value means that re-login should be attempted on each file system operation. - * Negative values are not allowed. - * - * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to - * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds - * have passed since the time of the previous login. - * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for - * more detail. - * - * @return The re-login interval, in milliseconds. - */ - public long getReloginInterval() { - return reloginInterval; - } - - /** - * Sets the relogin interval in milliseconds. See {@link #getReloginInterval()} for more information. - * - * @param reloginInterval The re-login interval, in milliseconds. - */ - public void setReloginInterval(long reloginInterval) { - this.reloginInterval = reloginInterval; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty."); - A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty."); - A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative."); - - super.start(); - - try { - UserGroupInformation.setConfiguration(cfg); - UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab); - } - catch (IOException ioe) { - throw new IgniteException("Failed login from keytab [keyTab=" + keyTab + - ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe); - } - } - - /** - * Re-logins the user if needed. - * First, the re-login interval defined in factory is checked. The re-login attempts will be not more - * frequent than one attempt per {@code reloginInterval}. - * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing - * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. - * - * <p>This operation expected to be called upon each operation with the file system created with the factory. - * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there - * is no need to invoke it otherwise specially. - * - * @throws IOException If login fails. - */ - private void reloginIfNeeded() throws IOException { - long now = System.currentTimeMillis(); - - if (now >= lastReloginTime + reloginInterval) { - UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - - lastReloginTime = now; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - U.writeString(out, keyTab); - U.writeString(out, keyTabPrincipal); - out.writeLong(reloginInterval); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - keyTab = U.readString(in); - keyTabPrincipal = U.readString(in); - reloginInterval = in.readLong(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package-info.java deleted file mode 100644 index 164801f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Ignite Hadoop Accelerator file system API. - */ -package org.apache.ignite.hadoop.fs; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java deleted file mode 100644 index a06129e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ /dev/null @@ -1,1364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs.v1; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.InvalidPathException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Progressable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsException; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; -import org.apache.ignite.internal.processors.igfs.IgfsPaths; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; -import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; -import static org.apache.ignite.igfs.IgfsMode.PROXY; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH; -import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME; - -/** - * {@code IGFS} Hadoop 1.x file system driver over file system API. To use - * {@code IGFS} as Hadoop file system, you should configure this class - * in Hadoop's {@code core-site.xml} as follows: - * <pre name="code" class="xml"> - * <property> - * <name>fs.default.name</name> - * <value>igfs:///</value> - * </property> - * - * <property> - * <name>fs.igfs.impl</name> - * <value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value> - * </property> - * </pre> - * You should also add Ignite JAR and all libraries to Hadoop classpath. To - * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop - * distribution: - * <pre name="code" class="bash"> - * export IGNITE_HOME=/path/to/Ignite/distribution - * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar - * - * for f in $IGNITE_HOME/libs/*.jar; do - * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f; - * done - * </pre> - * <h1 class="header">Data vs Clients Nodes</h1> - * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on - * data nodes. Client nodes are responsible for basic file system operations as well as - * accessing data nodes remotely. Usually, client nodes are started together - * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually - * started together with Hadoop {@code task-tracker} processes. - * <p> - * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} - * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. - */ -public class IgniteHadoopFileSystem extends FileSystem { - /** Internal property to indicate management connection. */ - public static final String IGFS_MANAGEMENT = "fs.igfs.management.connection"; - - /** Empty array of file block locations. */ - private static final BlockLocation[] EMPTY_BLOCK_LOCATIONS = new BlockLocation[0]; - - /** Empty array of file statuses. */ - public static final FileStatus[] EMPTY_FILE_STATUS = new FileStatus[0]; - - /** Ensures that close routine is invoked at most once. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Grid remote client. */ - private HadoopIgfsWrapper rmtClient; - - /** working directory. */ - private Path workingDir; - - /** Default replication factor. */ - private short dfltReplication; - - /** Base file system uri. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private URI uri; - - /** Authority. */ - private String uriAuthority; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Secondary URI string. */ - private URI secondaryUri; - - /** The user name this file system was created on behalf of. */ - private String user; - - /** IGFS mode resolver. */ - private IgfsModeResolver modeRslvr; - - /** The secondary file system factory. */ - private HadoopFileSystemFactory factory; - - /** Management connection flag. */ - private boolean mgmt; - - /** Whether custom sequential reads before prefetch value is provided. */ - private boolean seqReadsBeforePrefetchOverride; - - /** IGFS group block size. */ - private long igfsGrpBlockSize; - - /** Flag that controls whether file writes should be colocated. */ - private boolean colocateFileWrites; - - /** Prefer local writes. */ - private boolean preferLocFileWrites; - - /** Custom-provided sequential reads before prefetch. */ - private int seqReadsBeforePrefetch; - - /** {@inheritDoc} */ - @Override public URI getUri() { - if (uri == null) - throw new IllegalStateException("URI is null (was IgniteHadoopFileSystem properly initialized?)."); - - return uri; - } - - /** - * Enter busy state. - * - * @throws IOException If file system is stopped. - */ - private void enterBusy() throws IOException { - if (closeGuard.get()) - throw new IOException("File system is stopped."); - } - - /** - * Leave busy state. - */ - private void leaveBusy() { - // No-op. - } - - /** - * Gets non-null user name as per the Hadoop file system viewpoint. - * @return the user name, never null. - */ - public static String getFsHadoopUser() throws IOException { - UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); - - String user = currUgi.getShortUserName(); - - user = IgfsUtils.fixUserName(user); - - assert user != null; - - return user; - } - - /** - * Public setter that can be used by direct users of FS or Visor. - * - * @param colocateFileWrites Whether all ongoing file writes should be colocated. - */ - @SuppressWarnings("UnusedDeclaration") - public void colocateFileWrites(boolean colocateFileWrites) { - this.colocateFileWrites = colocateFileWrites; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void initialize(URI name, Configuration cfg) throws IOException { - enterBusy(); - - try { - if (rmtClient != null) - throw new IOException("File system is already initialized: " + rmtClient); - - A.notNull(name, "name"); - A.notNull(cfg, "cfg"); - - super.initialize(name, cfg); - - setConf(cfg); - - mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); - - if (!IGFS_SCHEME.equals(name.getScheme())) - throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + - "://[name]/[optional_path], actual=" + name + ']'); - - uri = name; - - uriAuthority = uri.getAuthority(); - - user = getFsHadoopUser(); - - // Override sequential reads before prefetch if needed. - seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); - - if (seqReadsBeforePrefetch > 0) - seqReadsBeforePrefetchOverride = true; - - // In Ignite replication factor is controlled by data cache affinity. - // We use replication factor to force the whole file to be stored on local node. - dfltReplication = (short)cfg.getInt("dfs.replication", 3); - - // Get file colocation control flag. - colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); - preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); - - // Get log directory. - String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); - - File logDirFile = U.resolveIgnitePath(logDirCfg); - - String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); - - // Handshake. - IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); - - igfsGrpBlockSize = handshake.blockSize(); - - IgfsPaths paths = handshake.secondaryPaths(); - - // Initialize client logger. - Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); - - if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { - // Initiate client logger. - if (logDir == null) - throw new IOException("Failed to resolve log directory: " + logDirCfg); - - Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); - - clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); - } - else - clientLog = IgfsLogger.disabledLogger(); - - try { - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); - } - catch (IgniteCheckedException ice) { - throw new IOException(ice); - } - - boolean initSecondary = paths.defaultMode() == PROXY; - - if (!initSecondary && paths.pathModes() != null && !paths.pathModes().isEmpty()) { - for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { - IgfsMode mode = pathMode.getValue(); - - if (mode == PROXY) { - initSecondary = true; - - break; - } - } - } - - if (initSecondary) { - try { - factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to get secondary file system factory.", e); - } - - if (factory == null) - throw new IOException("Failed to get secondary file system factory (did you set " + - IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " + - FileSystemConfiguration.class.getName() + "?)"); - - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).start(); - - try { - FileSystem secFs = factory.get(user); - - secondaryUri = secFs.getUri(); - - A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); - } - catch (IOException e) { - if (!mgmt) - throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); - else - LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " + - "will have no effect): " + e.getMessage()); - } - } - - // set working directory to the home directory of the current Fs user: - setWorkingDirectory(null); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override protected void checkPath(Path path) { - URI uri = path.toUri(); - - if (uri.isAbsolute()) { - if (!F.eq(uri.getScheme(), IGFS_SCHEME)) - throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + - uri.getAuthority() + ']'); - - if (!F.eq(uri.getAuthority(), uriAuthority)) - throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + - uri.getAuthority() + ']'); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public short getDefaultReplication() { - return dfltReplication; - } - - /** {@inheritDoc} */ - @Override protected void finalize() throws Throwable { - super.finalize(); - - close(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (closeGuard.compareAndSet(false, true)) - close0(); - } - - /** - * Closes file system. - * - * @throws IOException If failed. - */ - private void close0() throws IOException { - if (LOG.isDebugEnabled()) - LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); - - if (rmtClient == null) - return; - - super.close(); - - rmtClient.close(false); - - if (clientLog.isLogEnabled()) - clientLog.close(); - - if (factory instanceof LifecycleAware) - ((LifecycleAware) factory).stop(); - - // Reset initialized resources. - uri = null; - rmtClient = null; - } - - /** {@inheritDoc} */ - @Override public void setTimes(Path p, long mtime, long atime) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - // No-op for management connection. - return; - } - - secondaryFs.setTimes(toSecondary(p), mtime, atime); - } - else { - IgfsPath path = convert(p); - - rmtClient.setTimes(path, atime, mtime); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setPermission(Path p, FsPermission perm) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - // No-op for management connection. - return; - } - - secondaryFs.setPermission(toSecondary(p), perm); - } - else if (rmtClient.update(convert(p), permission(perm)) == null) { - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", perm=" + perm + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setOwner(Path p, String username, String grpName) throws IOException { - A.notNull(p, "p"); - A.notNull(username, "username"); - A.notNull(grpName, "grpName"); - - enterBusy(); - - try { - if (mode(p) == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - // No-op for management connection. - return; - } - - secondaryFs.setOwner(toSecondary(p), username, grpName); - } - else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, username, - IgfsUtils.PROP_GROUP_NAME, grpName)) == null) { - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to open file (secondary file system is not initialized): " + f); - } - - FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); - - if (clientLog.isLogEnabled()) { - // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); - - long size = status != null ? status.getLen() : -1; - - long logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, PROXY, bufSize, size); - - return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); - } - else - return is; - } - else { - HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? - rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, mode, bufSize, stream.length()); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + - ", bufSize=" + bufSize + ']'); - - HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), - bufSize, LOG, clientLog, logId); - - if (LOG.isDebugEnabled()) - LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); - - return new FSDataInputStream(igfsIn); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream create(Path f, final FsPermission perm, boolean overwrite, int bufSize, - short replication, long blockSize, Progressable progress) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - OutputStream out = null; - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + - path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to create file (secondary file system is not initialized): " + f); - } - - FSDataOutputStream os = - secondaryFs.create(toSecondary(f), perm, overwrite, bufSize, replication, blockSize, progress); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - - return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - Map<String,String> propMap = permission(perm); - - propMap.put(IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); - - // Create stream and close it in the 'finally' section if any sequential operation failed. - HadoopIgfsStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites, - replication, blockSize, propMap); - - assert stream != null; - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - - HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, - logId); - - bufSize = Math.max(64 * 1024, bufSize); - - out = new BufferedOutputStream(igfsOut, bufSize); - - FSDataOutputStream res = new FSDataOutputStream(out, null, 0); - - // Mark stream created successfully. - out = null; - - return res; - } - } - finally { - // Close if failed during stream creation. - if (out != null) - U.closeQuiet(out); - - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in append [thread=" + Thread.currentThread().getName() + - ", path=" + path + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to append file (secondary file system is not initialized): " + f); - } - - FSDataOutputStream os = secondaryFs.append(toSecondary(f), bufSize, progress); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - - return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - HadoopIgfsStreamDelegate stream = rmtClient.append(path, false, null); - - assert stream != null; - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, mode, bufSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - - HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, - logId); - - bufSize = Math.max(64 * 1024, bufSize); - - BufferedOutputStream out = new BufferedOutputStream(igfsOut, bufSize); - - return new FSDataOutputStream(out, null, 0); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean rename(Path src, Path dst) throws IOException { - A.notNull(src, "src"); - A.notNull(dst, "dst"); - - enterBusy(); - - try { - IgfsPath srcPath = convert(src); - IgfsPath dstPath = convert(dst); - IgfsMode mode = mode(srcPath); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - return false; - } - - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, PROXY, dstPath); - - return secondaryFs.rename(toSecondary(src), toSecondary(dst)); - } - else { - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, mode, dstPath); - - try { - rmtClient.rename(srcPath, dstPath); - } - catch (IOException ioe) { - // Log the exception before rethrowing since it may be ignored: - LOG.warn("Failed to rename [srcPath=" + srcPath + ", dstPath=" + dstPath + ", mode=" + mode + ']', - ioe); - - throw ioe; - } - - return true; - } - } - catch (IOException e) { - // Intentionally ignore IGFS exceptions here to follow Hadoop contract. - if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || - !X.hasCause(e.getCause(), IgfsException.class))) - throw e; - else - return false; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public boolean delete(Path f) throws IOException { - return delete(f, false); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean delete(Path f, boolean recursive) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - return false; - } - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, PROXY, recursive); - - return secondaryFs.delete(toSecondary(f), recursive); - } - else { - // Will throw exception if delete failed. - boolean res = rmtClient.delete(path, recursive); - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, mode, recursive); - - return res; - } - } - catch (IOException e) { - // Intentionally ignore IGFS exceptions here to follow Hadoop contract. - if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || - !X.hasCause(e.getCause(), IgfsException.class))) - throw e; - else - return false; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - return EMPTY_FILE_STATUS; - } - - FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); - - if (arr == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - for (int i = 0; i < arr.length; i++) - arr[i] = toPrimary(arr[i]); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, PROXY, fileArr); - } - - return arr; - } - else { - Collection<IgfsFile> list = rmtClient.listFiles(path); - - if (list == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - List<IgfsFile> files = new ArrayList<>(list); - - FileStatus[] arr = new FileStatus[files.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(files.get(i)); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, mode, fileArr); - } - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + user); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path newPath) { - try { - if (newPath == null) { - Path homeDir = getHomeDirectory(); - - FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs != null) - secondaryFs.setWorkingDirectory(toSecondary(homeDir)); - - workingDir = homeDir; - } - else { - Path fixedNewPath = fixRelativePart(newPath); - - String res = fixedNewPath.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs != null) - secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); - - workingDir = fixedNewPath; - } - } - catch (IOException e) { - throw new RuntimeException("Failed to obtain secondary file system instance.", e); - } - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public boolean mkdirs(Path f, FsPermission perm) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - return false; - } - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, PROXY); - - return secondaryFs.mkdirs(toSecondary(f), perm); - } - else { - boolean mkdirRes = rmtClient.mkdirs(path, permission(perm)); - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, mode); - - return mkdirRes; - } - } - catch (IOException e) { - // Intentionally ignore IGFS exceptions here to follow Hadoop contract. - if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || - !X.hasCause(e.getCause(), IgfsException.class))) - throw e; - else - return false; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to get file status (secondary file system is not initialized): " + f); - } - - return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); - } - else { - IgfsFile info = rmtClient.info(convert(f)); - - if (info == null) - throw new FileNotFoundException("File not found: " + f); - - return convert(info); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public ContentSummary getContentSummary(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to get content summary (secondary file system is not initialized): " + - f); - } - - return secondaryFs.getContentSummary(toSecondary(f)); - } - else { - IgfsPathSummary sum = rmtClient.contentSummary(convert(f)); - - return new ContentSummary(sum.totalLength(), sum.filesCount(), sum.directoriesCount(), - -1, sum.totalLength(), rmtClient.fsStatus().spaceTotal()); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public BlockLocation[] getFileBlockLocations(FileStatus status, long start, long len) throws IOException { - A.notNull(status, "status"); - - enterBusy(); - - try { - IgfsPath path = convert(status.getPath()); - - if (mode(status.getPath()) == PROXY) { - final FileSystem secondaryFs = secondaryFileSystem(); - - if (secondaryFs == null) { - assert mgmt; - - return EMPTY_BLOCK_LOCATIONS; - } - - Path secPath = toSecondary(status.getPath()); - - return secondaryFs.getFileBlockLocations(secondaryFs.getFileStatus(secPath), start, len); - } - else { - long now = System.currentTimeMillis(); - - List<IgfsBlockLocation> affinity = new ArrayList<>(rmtClient.affinity(path, start, len)); - - BlockLocation[] arr = new BlockLocation[affinity.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(affinity.get(i)); - - if (LOG.isDebugEnabled()) - LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + - (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); - - return arr; - } - } - catch (FileNotFoundException ignored) { - return EMPTY_BLOCK_LOCATIONS; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public long getDefaultBlockSize() { - return igfsGrpBlockSize; - } - - /** - * Resolve path mode. - * - * @param path HDFS path. - * @return Path mode. - */ - public IgfsMode mode(Path path) { - return mode(convert(path)); - } - - /** - * Resolve path mode. - * - * @param path IGFS path. - * @return Path mode. - */ - public IgfsMode mode(IgfsPath path) { - return modeRslvr.resolveMode(path); - } - - /** - * @return {@code true} If secondary file system is initialized. - */ - public boolean hasSecondaryFileSystem() { - return factory != null; - } - - /** - * Convert the given path to path acceptable by the primary file system. - * - * @param path Path. - * @return Primary file system path. - */ - private Path toPrimary(Path path) { - return convertPath(path, uri); - } - - /** - * Convert the given path to path acceptable by the secondary file system. - * - * @param path Path. - * @return Secondary file system path. - */ - private Path toSecondary(Path path) { - assert factory != null; - assert secondaryUri != null; - - return convertPath(path, secondaryUri); - } - - /** - * Convert path using the given new URI. - * - * @param path Old path. - * @param newUri New URI. - * @return New path. - */ - private Path convertPath(Path path, URI newUri) { - assert newUri != null; - - if (path != null) { - URI pathUri = path.toUri(); - - try { - return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, - pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); - } - catch (URISyntaxException e) { - throw new IgniteException("Failed to construct secondary file system path from the primary file " + - "system path: " + path, e); - } - } - else - return null; - } - - /** - * Convert a file status obtained from the secondary file system to a status of the primary file system. - * - * @param status Secondary file system status. - * @return Primary file system status. - */ - @SuppressWarnings("deprecation") - private FileStatus toPrimary(FileStatus status) { - return status != null ? new FileStatus(status.getLen(), status.isDir(), status.getReplication(), - status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), - status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - return new Path(IGFS_SCHEME, uriAuthority, path.toString()); - } - - /** - * Convert Hadoop path into IGFS path. - * - * @param path Hadoop path. - * @return IGFS path. - */ - @Nullable private IgfsPath convert(@Nullable Path path) { - if (path == null) - return null; - - return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(convert(workingDir), path.toUri().getPath()); - } - - /** - * Convert IGFS affinity block location into Hadoop affinity block location. - * - * @param block IGFS affinity block location. - * @return Hadoop affinity block location. - */ - private BlockLocation convert(IgfsBlockLocation block) { - Collection<String> names = block.names(); - Collection<String> hosts = block.hosts(); - - return new BlockLocation( - names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, - hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, - block.start(), block.length() - ) { - @Override public String toString() { - try { - return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + - ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - /** - * Convert IGFS file information into Hadoop file status. - * - * @param file IGFS file information. - * @return Hadoop file status. - */ - @SuppressWarnings("deprecation") - private FileStatus convert(IgfsFile file) { - return new FileStatus( - file.length(), - file.isDirectory(), - getDefaultReplication(), - file.groupBlockSize(), - file.modificationTime(), - file.accessTime(), - permission(file), - file.property(IgfsUtils.PROP_USER_NAME, user), - file.property(IgfsUtils.PROP_GROUP_NAME, "users"), - convert(file.path())) { - @Override public String toString() { - return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() + - ", mtime=" + getModificationTime() + ", atime=" + getAccessTime() + ']'; - } - }; - } - - /** - * Convert Hadoop permission into IGFS file attribute. - * - * @param perm Hadoop permission. - * @return IGFS attributes. - */ - private Map<String, String> permission(FsPermission perm) { - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm)); - } - - /** - * @param perm Permission. - * @return String. - */ - private static String toString(FsPermission perm) { - return String.format("%04o", perm.toShort()); - } - - /** - * Convert IGFS file attributes into Hadoop permission. - * - * @param file File info. - * @return Hadoop permission. - */ - private FsPermission permission(IgfsFile file) { - String perm = file.property(IgfsUtils.PROP_PERMISSION, null); - - if (perm == null) - return FsPermission.getDefault(); - - try { - return new FsPermission((short)Integer.parseInt(perm, 8)); - } - catch (NumberFormatException ignore) { - return FsPermission.getDefault(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteHadoopFileSystem.class, this); - } - - /** - * Returns the user name this File System is created on behalf of. - * @return the user name - */ - public String user() { - return user; - } - - /** - * Gets cached or creates a {@link FileSystem}. - * - * @return The secondary file system. - */ - private @Nullable FileSystem secondaryFileSystem() throws IOException{ - if (factory == null) - return null; - - return factory.get(user); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java deleted file mode 100644 index 60e62ca..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Contains Ignite Hadoop 1.x <code>FileSystem</code> implementation. - */ -package org.apache.ignite.hadoop.fs.v1; \ No newline at end of file