Repository: incubator-falcon Updated Branches: refs/heads/master b4cd214c7 -> 07a0b588e
FALCON-819 Submission of cluster with registry interface fails on secure setup. Contributed by Sowmya Ramesh Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/07a0b588 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/07a0b588 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/07a0b588 Branch: refs/heads/master Commit: 07a0b588e6f5392607199dcf7e128fc9a0b85586 Parents: b4cd214 Author: Venkatesh Seetharam <venkat...@apache.org> Authored: Mon Oct 20 11:56:03 2014 -0700 Committer: Venkatesh Seetharam <venkat...@apache.org> Committed: Mon Oct 20 11:56:03 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../falcon/catalog/HiveCatalogService.java | 100 +++++++++---------- .../falcon/catalog/HiveCatalogServiceIT.java | 4 +- .../apache/falcon/late/LateDataHandlerIT.java | 2 +- .../lifecycle/TableStorageFeedEvictorIT.java | 2 +- .../org/apache/falcon/util/HiveTestUtils.java | 14 +-- 6 files changed, 63 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07a0b588/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7af3263..4aca3e7 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -119,6 +119,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-819 Submission of cluster with registry interface fails on secure + setup (Sowmya Ramesh via Venkatesh Seetharam) + FALCON-804 Remove Oozie 3.* patch files from Falcon (Peeyush Bishnoi via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07a0b588/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java index b5be4e1..170fef2 100644 --- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java +++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java @@ -19,11 +19,12 @@ package org.apache.falcon.catalog; import org.apache.falcon.FalconException; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.security.CurrentUser; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.io.Text; import org.apache.hcatalog.api.HCatClient; import org.apache.hcatalog.api.HCatDatabase; import org.apache.hcatalog.api.HCatPartition; @@ -39,7 +40,6 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * An implementation of CatalogService that uses Hive Meta Store (HCatalog) @@ -49,27 +49,7 @@ public class HiveCatalogService extends AbstractCatalogService { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class); - private static final ConcurrentHashMap<String, HCatClient> CACHE = new ConcurrentHashMap<String, HCatClient>(); - - public static HCatClient get(Cluster cluster) throws FalconException { - assert cluster != null : "Cluster cant be null"; - - String metastoreUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(); - return get(metastoreUrl); - } - - public static synchronized HCatClient get(String metastoreUrl) throws FalconException { - - if (!CACHE.containsKey(metastoreUrl)) { - HCatClient hCatClient = getHCatClient(metastoreUrl); - LOG.info("Caching HCatalog client object for {}", metastoreUrl); - CACHE.putIfAbsent(metastoreUrl, hCatClient); - } - - return CACHE.get(metastoreUrl); - } - - private static HCatClient getHCatClient(String metastoreUrl) throws FalconException { + public static HCatClient getHCatClient(String metastoreUrl) throws FalconException { try { HiveConf hcatConf = createHiveConf(metastoreUrl); return HCatClient.create(hcatConf); @@ -93,31 +73,49 @@ public class HiveCatalogService extends AbstractCatalogService { } public static synchronized HCatClient getProxiedClient(String catalogUrl, - String metaStorePrincipal) throws FalconException { - if (!CACHE.containsKey(catalogUrl)) { - try { - final HiveConf hcatConf = createHiveConf(catalogUrl); - if (UserGroupInformation.isSecurityEnabled()) { - hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metaStorePrincipal); - hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); - } + String metaStoreServicePrincipal) + throws FalconException { - LOG.info("Creating and caching HCatalog client object for {}", catalogUrl); - UserGroupInformation currentUser = UserGroupInformation.getLoginUser(); - HCatClient hcatClient = currentUser.doAs(new PrivilegedExceptionAction<HCatClient>() { - public HCatClient run() throws Exception { - return HCatClient.create(hcatConf); - } - }); - CACHE.putIfAbsent(catalogUrl, hcatClient); - } catch (IOException e) { - throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e); - } catch (InterruptedException e) { - throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e); + try { + final HiveConf hcatConf = createHiveConf(catalogUrl); + UserGroupInformation proxyUGI = CurrentUser.getProxyUGI(); + if (UserGroupInformation.isSecurityEnabled()) { + hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, + metaStoreServicePrincipal); + hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); + + Token<DelegationTokenIdentifier> delegationTokenId = getDelegationToken( + hcatConf, metaStoreServicePrincipal); + proxyUGI.addToken(delegationTokenId); } + + LOG.info("Creating and caching HCatalog client object for {}", catalogUrl); + return proxyUGI.doAs(new PrivilegedExceptionAction<HCatClient>() { + public HCatClient run() throws Exception { + return HCatClient.create(hcatConf); + } + }); + } catch (IOException e) { + throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e); + } catch (InterruptedException e) { + throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), + e); } + } + + private static Token<DelegationTokenIdentifier> getDelegationToken(HiveConf hcatConf, + String metaStoreServicePrincipal) + throws IOException { + + HCatClient hcatClient = HCatClient.create(hcatConf); + String delegationToken = hcatClient.getDelegationToken( + CurrentUser.getUser(), metaStoreServicePrincipal); + hcatConf.set("hive.metastore.token.signature", "FalconService"); - return CACHE.get(catalogUrl); + Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>(); + delegationTokenId.decodeFromUrlString(delegationToken); + delegationTokenId.setService(new Text("FalconService")); + return delegationTokenId; } @Override @@ -154,7 +152,7 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Checking if the table is external: {}", tableName); try { - HCatClient client = get(catalogUrl); + HCatClient client = getHCatClient(catalogUrl); HCatTable table = client.getTable(database, tableName); return !table.getTabletype().equals("MANAGED_TABLE"); } catch (HCatException e) { @@ -171,7 +169,7 @@ public class HiveCatalogService extends AbstractCatalogService { try { List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>(); - HCatClient client = get(catalogUrl); + HCatClient client = getHCatClient(catalogUrl); List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter); for (HCatPartition hCatPartition : hCatPartitions) { LOG.info("Partition: " + hCatPartition.getValues()); @@ -213,7 +211,7 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitions); try { - HCatClient client = get(catalogUrl); + HCatClient client = getHCatClient(catalogUrl); client.dropPartitions(database, tableName, partitions, true); } catch (HCatException e) { throw new FalconException("Exception dropping partitions:" + e.getMessage(), e); @@ -228,7 +226,7 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionSpec); try { - HCatClient client = get(catalogUrl); + HCatClient client = getHCatClient(catalogUrl); HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec); return createCatalogPartition(hCatPartition); } catch (HCatException e) { @@ -242,7 +240,7 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Fetching partition columns of table: " + tableName); try { - HCatClient client = get(catalogUrl); + HCatClient client = getHCatClient(catalogUrl); HCatTable table = client.getTable(database, tableName); List<HCatFieldSchema> partSchema = table.getPartCols(); List<String> partCols = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07a0b588/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java index 6966a8d..04fbf4d 100644 --- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java +++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java @@ -62,7 +62,7 @@ public class HiveCatalogServiceIT { CurrentUser.authenticate(TestContext.REMOTE_USER); hiveCatalogService = new HiveCatalogService(); - client = HiveCatalogService.get(METASTORE_URL); + client = HiveCatalogService.getHCatClient(METASTORE_URL); createDatabase(); createTable(); @@ -169,7 +169,7 @@ public class HiveCatalogServiceIT { @Test public void testGet() throws Exception { - Assert.assertNotNull(HiveCatalogService.get(METASTORE_URL)); + Assert.assertNotNull(HiveCatalogService.getHCatClient(METASTORE_URL)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07a0b588/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java index cdeba63..bfc6f2f 100644 --- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java +++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java @@ -196,7 +196,7 @@ public class LateDataHandlerIT { } private void reinstatePartition() throws Exception { - final HCatClient client = HiveCatalogService.get(metastoreUrl); + final HCatClient client = HiveCatalogService.getHCatClient(metastoreUrl); Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put("ds", PARTITION_VALUE); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07a0b588/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java index de6f782..d508a2d 100644 --- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java @@ -87,7 +87,7 @@ public class TableStorageFeedEvictorIT { public void setUp() throws Exception { FeedEvictor.OUT.set(stream); - client = HiveCatalogService.get(METASTORE_URL); + client = HiveCatalogService.getHCatClient(METASTORE_URL); HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME); final List<String> partitionKeys = Arrays.asList("ds", "region"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07a0b588/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java index 5087e20..3b71f08 100644 --- a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java +++ b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java @@ -44,20 +44,20 @@ public final class HiveTestUtils { public static void createDatabase(String metaStoreUrl, String databaseName) throws Exception { - HCatClient client = HiveCatalogService.get(metaStoreUrl); + HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(databaseName) .ifNotExists(true).build(); client.createDatabase(dbDesc); } public static void dropDatabase(String metaStoreUrl, String databaseName) throws Exception { - HCatClient client = HiveCatalogService.get(metaStoreUrl); + HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); client.dropDatabase(databaseName, true, HCatClient.DropDBMode.CASCADE); } public static void createTable(String metaStoreUrl, String databaseName, String tableName) throws Exception { - HCatClient client = HiveCatalogService.get(metaStoreUrl); + HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>(); cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment")); cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment")); @@ -72,7 +72,7 @@ public final class HiveTestUtils { public static void createTable(String metaStoreUrl, String databaseName, String tableName, List<String> partitionKeys) throws Exception { - HCatClient client = HiveCatalogService.get(metaStoreUrl); + HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>(); cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment")); cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment")); @@ -112,7 +112,7 @@ public final class HiveTestUtils { .location(externalLocation) .build(); - HCatClient client = HiveCatalogService.get(metaStoreUrl); + HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); client.createTable(tableDesc); } @@ -147,7 +147,7 @@ public final class HiveTestUtils { public static void dropTable(String metaStoreUrl, String databaseName, String tableName) throws Exception { - HCatClient client = HiveCatalogService.get(metaStoreUrl); + HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); client.dropTable(databaseName, tableName, true); } @@ -182,6 +182,6 @@ public final class HiveTestUtils { Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put(partitionKey, partitionValue); - return HiveCatalogService.get(metastoreUrl).getPartition(databaseName, tableName, partitionSpec); + return HiveCatalogService.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec); } }