Repository: sqoop Updated Branches: refs/heads/sqoop2 e2fc4a75e -> 408e3d566
SQOOP-2709. Sqoop2: HDFS: Make sure impersonation works on secured cluster. (Jarcec via Hari) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/408e3d56 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/408e3d56 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/408e3d56 Branch: refs/heads/sqoop2 Commit: 408e3d5663d511b99511641a5bcf66a7daf6a7af Parents: e2fc4a7 Author: Hari Shreedharan <[email protected]> Authored: Wed Dec 2 12:14:21 2015 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Wed Dec 2 12:14:21 2015 -0800 ---------------------------------------------------------------------- .../sqoop/connector/hdfs/HdfsConstants.java | 2 + .../sqoop/connector/hdfs/HdfsExtractor.java | 3 +- .../connector/hdfs/HdfsFromInitializer.java | 7 +- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 4 +- .../sqoop/connector/hdfs/HdfsPartitioner.java | 4 +- .../sqoop/connector/hdfs/HdfsToDestroyer.java | 4 +- .../sqoop/connector/hdfs/HdfsToInitializer.java | 8 +- .../connector/hdfs/security/SecurityUtils.java | 146 +++++++++++++++++++ .../hdfs/security/TestSecurityUtils.java | 49 +++++++ 9 files changed, 217 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java index 39ee4a3..f06300a 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java @@ -35,4 +35,6 @@ public final class HdfsConstants extends Constants { public static final String WORK_DIRECTORY = PREFIX + "work_dir"; public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date"; + + public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens"; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 583acdd..441fe30 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -37,6 +37,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; @@ -60,7 +61,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura @Override public void extract(final ExtractorContext context, final LinkConfiguration linkConfiguration, final FromJobConfiguration jobConfiguration, final HdfsPartition partition) { try { - UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { HdfsUtils.contextToConfiguration(context.getContext(), conf); dataWriter = context.getDataWriter(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java index be837ca..3a0d626 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -27,6 +27,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; @@ -62,7 +63,7 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC // In case of incremental import, we need to persist the highest last modified try { - UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { FileSystem fs = FileSystem.get(configuration); Path path = new Path(jobConfig.fromJobConfig.inputDirectory); @@ -89,6 +90,10 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC LOG.info("Maximal age of file is: " + maxModifiedTime); context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime); } + + // Generate delegation tokens if we are on secured cluster + SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration); + return null; } }); http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 04acd18..a6551e6 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -34,6 +34,7 @@ import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; @@ -56,8 +57,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { @Override public void load(final LoaderContext context, final LinkConfiguration linkConfiguration, final ToJobConfiguration toJobConfig) throws Exception { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { Configuration conf = new Configuration(); HdfsUtils.contextToConfiguration(context.getContext(), conf); http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index 998b903..d01e932 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -45,6 +45,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.IncrementalType; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -83,8 +84,7 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi final List<Partition> partitions = new ArrayList<>(); try { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory); maxSplitSize = numInputBytes / context.getMaxPartitions(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java index 2bad23a..858042c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -26,6 +26,7 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; @@ -50,8 +51,7 @@ public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfigura final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory); try { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { FileSystem fs = FileSystem.get(configuration); http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java index 5856371..204c978 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -27,6 +27,7 @@ import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.hdfs.security.SecurityUtils; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; @@ -58,8 +59,7 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi // Verification that given HDFS directory either don't exists or is empty try { - UserGroupInformation.createProxyUser(context.getUser(), - UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() { + SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { FileSystem fs = FileSystem.get(configuration); Path path = new Path(jobConfig.toJobConfig.outputDirectory); @@ -76,6 +76,10 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi } } } + + // Generate delegation tokens if we are on secured cluster + SecurityUtils.generateDelegationTokens(context.getContext(), path, configuration); + return null; } }); http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java new file mode 100644 index 0000000..0a42936 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.security; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.connector.hdfs.HdfsConstants; +import org.apache.sqoop.job.etl.TransferableContext; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security portion + * (like generating and distributing delegation tokens) won't happen automatically for us under the hood + * and we have to do everything manually. + */ +public class SecurityUtils { + + private static final Logger LOG = Logger.getLogger(SecurityUtils.class); + + /** + * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start job" commnad) + */ + static public UserGroupInformation createProxyUser(TransferableContext context) throws IOException { + return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()); + } + + /** + * Creates proxy user and load's it up with all delegation tokens that we have created ourselves + */ + static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext context) throws IOException { + UserGroupInformation proxyUser = createProxyUser(context); + loadDelegationTokensToUGI(proxyUser, context.getContext()); + + return proxyUser; + } + + /** + * Generate delegation tokens for current user (this code is suppose to run in doAs) and store them + * serialized in given mutable context. + */ + static public void generateDelegationTokens(MutableContext context, Path path, Configuration configuration) throws IOException { + if(!UserGroupInformation.isSecurityEnabled()) { + LOG.info("Running on unsecured cluster, skipping delegation token generation."); + return; + } + + // String representation of all tokens that we will create (most likely single one) + List<String> tokens = new LinkedList<>(); + + Credentials credentials = new Credentials(); + TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration); + for (Token token : credentials.getAllTokens()) { + LOG.info("Generated token: " + token.toString()); + tokens.add(serializeToken(token)); + } + + // The context classes are transferred via "Credentials" rather then with jobconf, so we're not leaking the DT out here + if(tokens.size() > 0) { + context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " ")); + } + } + + /** + * Loads delegation tokens that we created and serialize into the mutable context + */ + static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext context) throws IOException { + String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS); + if(tokenList == null) { + LOG.info("No delegation tokens found"); + return; + } + + for(String stringToken: tokenList.split(" ")) { + Token token = deserializeToken(stringToken); + LOG.info("Loaded delegation token: " + token.toString()); + ugi.addToken(token); + } + } + + /** + * Serialize given token into String. + * + * We'll convert token to byte[] using Writable methods fro I/O and then Base64 + * encode the bytes to a human readable string. + */ + static public String serializeToken(Token token) throws IOException { + // Serialize the Token to a byte array + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + token.write(dos); + baos.flush(); + + return Base64.encodeBase64String(baos.toByteArray()); + } + + /** + * Deserialize token from given String. + * + * See serializeToken for details how the token is expected to be serialized. + */ + static public Token deserializeToken(String stringToken) throws IOException { + Token token = new Token(); + byte[] tokenBytes = Base64.decodeBase64(stringToken); + + ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes); + DataInputStream dis = new DataInputStream(bais); + token.readFields(dis); + + return token; + } + + private SecurityUtils() { + // Initialization is prohibited + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/408e3d56/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java new file mode 100644 index 0000000..713c704 --- /dev/null +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.security; + +import org.apache.hadoop.io.Text; +import org.testng.annotations.Test; +import org.apache.hadoop.security.token.Token; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TestSecurityUtils { + + @Test + public void testTokenSerializationDeserialization() throws Exception { + byte[] identifier = "identifier".getBytes(); + byte[] password = "password".getBytes(); + Text kind = new Text("kind"); + Text service = new Text("service"); + + Token token = new Token(identifier, password, kind, service); + String serializedForm = SecurityUtils.serializeToken(token); + assertNotNull(serializedForm); + + Token deserializedToken = SecurityUtils.deserializeToken(serializedForm); + assertNotNull(deserializedToken); + + assertEquals(identifier, deserializedToken.getIdentifier()); + assertEquals(password, deserializedToken.getPassword()); + assertEquals(kind.toString(), deserializedToken.getKind().toString()); + assertEquals(service.toString(), deserializedToken.getService().toString()); + } + +}
