(TWILL-194) Acquires KMS delegation token correctly - This is to workaround HDFS-10296 that if FileContext object is used to acquire delegation token, the KMS delegation token is not included
This closes #43 on Github. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/390dfabd Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/390dfabd Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/390dfabd Branch: refs/heads/site Commit: 390dfabd6d5c339b049736efdb031846380ae892 Parents: d2a503a Author: Terence Yim <[email protected]> Authored: Mon Mar 27 10:58:54 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Mon Mar 27 12:40:49 2017 -0700 ---------------------------------------------------------------------- .../apache/twill/internal/yarn/YarnUtils.java | 53 +++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/390dfabd/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java index ff8f4bb..e931144 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java @@ -22,9 +22,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.io.DataInputByteBuffer; @@ -53,9 +51,11 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; /** * Collection of helper methods to simplify YARN calls. @@ -160,26 +160,19 @@ public class YarnUtils { return ImmutableList.of(); } - LocationFactory factory = unwrap(locationFactory); - String renewer = getYarnTokenRenewer(config); - List<Token<?>> tokens = ImmutableList.of(); + FileSystem fileSystem = getFileSystem(locationFactory, config); - if (factory instanceof HDFSLocationFactory) { - FileSystem fs = ((HDFSLocationFactory) factory).getFileSystem(); - Token<?>[] fsTokens = fs.addDelegationTokens(renewer, credentials); - if (fsTokens != null) { - tokens = ImmutableList.copyOf(fsTokens); - } - } else if (factory instanceof FileContextLocationFactory) { - FileContext fc = ((FileContextLocationFactory) factory).getFileContext(); - tokens = fc.getDelegationTokens(new Path(locationFactory.create("/").toURI()), renewer); + if (fileSystem == null) { + LOG.warn("Unexpected: LocationFactory is neither FileContextLocationFactory nor HDFSLocationFactory."); + return ImmutableList.of(); } - for (Token<?> token : tokens) { - credentials.addToken(token.getService(), token); - } + String renewer = YarnUtils.getYarnTokenRenewer(config); + + Token<?>[] tokens = fileSystem.addDelegationTokens(renewer, credentials); + LOG.debug("Added HDFS DelegationTokens: {}", Arrays.toString(tokens)); - return ImmutableList.copyOf(tokens); + return tokens == null ? ImmutableList.<Token<?>>of() : ImmutableList.copyOf(tokens); } /** @@ -298,6 +291,7 @@ public class YarnUtils { } try { + //noinspection unchecked return (T) Class.forName(className).newInstance(); } catch (Exception e) { throw Throwables.propagate(e); @@ -319,14 +313,25 @@ public class YarnUtils { } /** - * Unwraps the given {@link LocationFactory} and returns the inner most {@link LocationFactory} which is not - * a {@link ForwardingLocationFactory}. + * Gets the Hadoop {@link FileSystem} from LocationFactory. + * + * @return the Hadoop {@link FileSystem} that represents the filesystem used by the given {@link LocationFactory}; + * {@code null} will be returned if unable to determine the {@link FileSystem}. */ - private static LocationFactory unwrap(LocationFactory locationFactory) { - while (locationFactory instanceof ForwardingLocationFactory) { - locationFactory = ((ForwardingLocationFactory) locationFactory).getDelegate(); + @Nullable + private static FileSystem getFileSystem(LocationFactory locationFactory, Configuration config) throws IOException { + if (locationFactory instanceof HDFSLocationFactory) { + return ((HDFSLocationFactory) locationFactory).getFileSystem(); + } + if (locationFactory instanceof ForwardingLocationFactory) { + return getFileSystem(((ForwardingLocationFactory) locationFactory).getDelegate(), config); + } + // Due to HDFS-10296, for encrypted file systems, FileContext does not acquire the KMS delegation token + // Since we know we are in Yarn, it is safe to get the FileSystem directly, bypassing LocationFactory. + if (locationFactory instanceof FileContextLocationFactory) { + return FileSystem.get(config); } - return locationFactory; + return null; } private YarnUtils() {
