Repository: incubator-falcon Updated Branches: refs/heads/master b364820dd -> a38cebf42
FALCON-419 Update deprecated HCatalog API to use Hive Metastore API. Contributed by Shwetha GS Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a38cebf4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a38cebf4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a38cebf4 Branch: refs/heads/master Commit: a38cebf42b0addaf8fc2e7892f55166a3cd902b5 Parents: b364820 Author: shwethags <shwetha...@inmobi.com> Authored: Fri Dec 12 10:46:19 2014 +0530 Committer: shwethags <shwetha...@inmobi.com> Committed: Fri Dec 12 10:46:54 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + common/pom.xml | 2 +- .../falcon/catalog/AbstractCatalogService.java | 43 ++--- .../apache/falcon/catalog/CatalogPartition.java | 13 -- .../falcon/catalog/HiveCatalogService.java | 163 ++++++++----------- .../apache/falcon/entity/CatalogStorage.java | 112 +++---------- pom.xml | 15 +- .../FalconAuthenticationFilterTest.java | 2 +- .../apache/falcon/latedata/LateDataHandler.java | 19 ++- .../falcon/catalog/HiveCatalogServiceIT.java | 52 ++---- .../apache/falcon/late/LateDataHandlerIT.java | 6 +- .../lifecycle/TableStorageFeedEvictorIT.java | 13 +- .../org/apache/falcon/resource/TestContext.java | 20 +++ .../org/apache/falcon/util/HiveTestUtils.java | 16 +- 14 files changed, 178 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f671e56..e9758c0 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -32,6 +32,9 @@ Trunk (Unreleased) Seetharam) OPTIMIZATIONS + FALCON-419 Update deprecated HCatalog API to use Hive Metastore API. + (Shwetha GS) + FALCON-423 Updating falcon server endpoint in distributed setup doesn't work. (Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index 50dd2ea..b349c2f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -154,7 +154,7 @@ <dependency> <groupId>org.apache.hive</groupId> - <artifactId>hive-common</artifactId> + <artifactId>hive-metastore</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java index 348fac0..9abdc93 100644 --- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java +++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java @@ -22,7 +22,6 @@ import org.apache.falcon.FalconException; import org.apache.hadoop.conf.Configuration; import java.util.List; -import java.util.Map; /** * Interface definition for a catalog registry service @@ -88,47 +87,49 @@ public abstract class AbstractCatalogService { /** * Drops a given partition. Executed in the workflow engine. * - * * @param conf conf object * @param catalogUrl url for the catalog service * @param database database the table belongs to * @param tableName tableName to check if it exists - * @param partitions list of partitions as Key=Value pairs + * @param partitionValues list of partition values + * @param deleteData should dropPartition also delete the corresponding data * @return if the partition was dropped * @throws FalconException */ - public abstract boolean dropPartitions(Configuration conf, String catalogUrl, - String database, String tableName, - Map<String, String> partitions) throws FalconException; + public abstract boolean dropPartition(Configuration conf, String catalogUrl, + String database, String tableName, List<String> partitionValues, + boolean deleteData) throws FalconException; /** - * Gets the partition. Executed in the workflow engine. - * + * Drops the partitions. Executed in the workflow engine. * - * @param conf conf + * @param conf conf object * @param catalogUrl url for the catalog service * @param database database the table belongs to * @param tableName tableName to check if it exists - * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}. - * All partition-key-values must be specified. - * @return An instance of CatalogPartition. + * @param partitionValues list of partition values + * @param deleteData should dropPartition also delete the corresponding data + * @return if the partition was dropped * @throws FalconException */ - public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl, - String database, String tableName, - Map<String, String> partitionSpec) - throws FalconException; + public abstract void dropPartitions(Configuration conf, String catalogUrl, + String database, String tableName, + List<String> partitionValues, boolean deleteData) throws FalconException; /** + * Gets the partition. Executed in the workflow engine. + * * * @param conf conf * @param catalogUrl url for the catalog service * @param database database the table belongs to - * @param tableName table name - * @return list of partition column names of the table + * @param tableName tableName to check if it exists + * @param partitionValues Values for partition columns. + * @return An instance of CatalogPartition. * @throws FalconException */ - public abstract List<String> getTablePartitionCols(Configuration conf, String catalogUrl, - String database, - String tableName) throws FalconException; + public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl, + String database, String tableName, + List<String> partitionValues) + throws FalconException; } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java index c5d4705..032ae38 100644 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java +++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java @@ -30,7 +30,6 @@ public class CatalogPartition { private List<String> values; private long createTime; private long lastAccessTime; - private List<String> tableColumns; private String inputFormat; private String outputFormat; private String location; @@ -59,10 +58,6 @@ public class CatalogPartition { this.lastAccessTime = lastAccessTime; } - protected void setTableColumns(List<String> tableColumns) { - this.tableColumns = tableColumns; - } - protected void setInputFormat(String inputFormat) { this.inputFormat = inputFormat; } @@ -97,14 +92,6 @@ public class CatalogPartition { return this.tableName; } - /** - * Gets the columns of the table. - * - * @return the columns - */ - public List<String> getColumns() { - return this.tableColumns; - } /** * Gets the input format. http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java index 3216f1e..f59b83b 100644 --- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java +++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java @@ -24,6 +24,11 @@ import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.Credentials; @@ -31,12 +36,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.io.Text; import org.apache.hcatalog.api.HCatClient; -import org.apache.hcatalog.api.HCatDatabase; -import org.apache.hcatalog.api.HCatPartition; -import org.apache.hcatalog.api.HCatTable; import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.apache.hcatalog.common.HCatException; -import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * An implementation of CatalogService that uses Hive Meta Store (HCatalog) @@ -55,25 +54,8 @@ public class HiveCatalogService extends AbstractCatalogService { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class); - /** - * This is only used for tests. - * - * @param metastoreUrl metastore url - * @return client object - * @throws FalconException - */ - public static HCatClient getHCatClient(String metastoreUrl) throws FalconException { - try { - HiveConf hcatConf = createHiveConf(new Configuration(false), metastoreUrl); - return HCatClient.create(hcatConf); - } catch (HCatException e) { - throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e); - } catch (IOException e) { - throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e); - } - } - private static HiveConf createHiveConf(Configuration conf, + public static HiveConf createHiveConf(Configuration conf, String metastoreUrl) throws IOException { HiveConf hcatConf = new HiveConf(conf, HiveConf.class); @@ -97,8 +79,8 @@ public class HiveCatalogService extends AbstractCatalogService { * @return hive metastore client handle * @throws FalconException */ - private static HCatClient createHCatClient(Configuration conf, - String metastoreUrl) throws FalconException { + private static HiveMetaStoreClient createClient(Configuration conf, + String metastoreUrl) throws FalconException { try { LOG.info("Creating HCatalog client object for metastore {} using conf {}", metastoreUrl, conf.toString()); @@ -117,11 +99,9 @@ public class HiveCatalogService extends AbstractCatalogService { OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf "); - return HCatClient.create(hcatConf); - } catch (HCatException e) { - throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e); - } catch (IOException e) { - throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e); + return new HiveMetaStoreClient(hcatConf); + } catch (Exception e) { + throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e); } } @@ -158,8 +138,8 @@ public class HiveCatalogService extends AbstractCatalogService { * @return hive metastore client handle * @throws FalconException */ - private static HCatClient createProxiedHCatClient(Configuration conf, - String catalogUrl) throws FalconException { + private static HiveMetaStoreClient createProxiedClient(Configuration conf, + String catalogUrl) throws FalconException { try { final HiveConf hcatConf = createHiveConf(conf, catalogUrl); @@ -167,15 +147,13 @@ public class HiveCatalogService extends AbstractCatalogService { addSecureCredentialsAndToken(conf, hcatConf, proxyUGI); LOG.info("Creating HCatalog client object for {}", catalogUrl); - return proxyUGI.doAs(new PrivilegedExceptionAction<HCatClient>() { - public HCatClient run() throws Exception { - return HCatClient.create(hcatConf); + return proxyUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() { + public HiveMetaStoreClient run() throws Exception { + return new HiveMetaStoreClient(hcatConf); } }); - } catch (IOException e) { - throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e); - } catch (InterruptedException e) { - throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e); + } catch (Exception e) { + throw new FalconException("Exception creating Proxied HiveMetaStoreClient: " + e.getMessage(), e); } } @@ -216,10 +194,10 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Checking if the service is alive for: {}", catalogUrl); try { - HCatClient client = createProxiedHCatClient(conf, catalogUrl); - HCatDatabase database = client.getDatabase("default"); + HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl); + Database database = client.getDatabase("default"); return database != null; - } catch (HCatException e) { + } catch (Exception e) { throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e); } } @@ -230,10 +208,10 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Checking if the table exists: {}", tableName); try { - HCatClient client = createProxiedHCatClient(conf, catalogUrl); - HCatTable table = client.getTable(database, tableName); + HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl); + Table table = client.getTable(database, tableName); return table != null; - } catch (HCatException e) { + } catch (Exception e) { throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e); } } @@ -244,10 +222,10 @@ public class HiveCatalogService extends AbstractCatalogService { LOG.info("Checking if the table is external: {}", tableName); try { - HCatClient client = createHCatClient(conf, catalogUrl); - HCatTable table = client.getTable(database, tableName); - return !table.getTabletype().equals("MANAGED_TABLE"); - } catch (HCatException e) { + HiveMetaStoreClient client = createClient(conf, catalogUrl); + Table table = client.getTable(database, tableName); + return table.getTableType().equals(TableType.EXTERNAL_TABLE.name()); + } catch (Exception e) { throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e); } } @@ -261,89 +239,78 @@ public class HiveCatalogService extends AbstractCatalogService { try { List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>(); - HCatClient client = createHCatClient(conf, catalogUrl); - List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter); - for (HCatPartition hCatPartition : hCatPartitions) { + HiveMetaStoreClient client = createClient(conf, catalogUrl); + List<Partition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter, (short) -1); + for (Partition hCatPartition : hCatPartitions) { LOG.info("Partition: " + hCatPartition.getValues()); CatalogPartition partition = createCatalogPartition(hCatPartition); catalogPartitionList.add(partition); } return catalogPartitionList; - } catch (HCatException e) { + } catch (Exception e) { throw new FalconException("Exception listing partitions:" + e.getMessage(), e); } } - private CatalogPartition createCatalogPartition(HCatPartition hCatPartition) { + private CatalogPartition createCatalogPartition(Partition hCatPartition) { final CatalogPartition catalogPartition = new CatalogPartition(); - catalogPartition.setDatabaseName(hCatPartition.getDatabaseName()); + catalogPartition.setDatabaseName(hCatPartition.getDbName()); catalogPartition.setTableName(hCatPartition.getTableName()); catalogPartition.setValues(hCatPartition.getValues()); - catalogPartition.setInputFormat(hCatPartition.getInputFormat()); - catalogPartition.setOutputFormat(hCatPartition.getOutputFormat()); - catalogPartition.setLocation(hCatPartition.getLocation()); - catalogPartition.setSerdeInfo(hCatPartition.getSerDe()); + catalogPartition.setInputFormat(hCatPartition.getSd().getInputFormat()); + catalogPartition.setOutputFormat(hCatPartition.getSd().getOutputFormat()); + catalogPartition.setLocation(hCatPartition.getSd().getLocation()); + catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib()); catalogPartition.setCreateTime(hCatPartition.getCreateTime()); catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime()); - - List<String> tableColumns = new ArrayList<String>(); - for (HCatFieldSchema hCatFieldSchema : hCatPartition.getColumns()) { - tableColumns.add(hCatFieldSchema.getName()); - } - catalogPartition.setTableColumns(tableColumns); - return catalogPartition; } @Override - public boolean dropPartitions(Configuration conf, String catalogUrl, + public boolean dropPartition(Configuration conf, String catalogUrl, String database, String tableName, - Map<String, String> partitions) throws FalconException { - LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitions); + List<String> partitionValues, boolean deleteData) throws FalconException { + LOG.info("Dropping partition for: {}, partition: {}", tableName, partitionValues); try { - HCatClient client = createHCatClient(conf, catalogUrl); - client.dropPartitions(database, tableName, partitions, true); - } catch (HCatException e) { + HiveMetaStoreClient client = createClient(conf, catalogUrl); + return client.dropPartition(database, tableName, partitionValues, deleteData); + } catch (Exception e) { throw new FalconException("Exception dropping partitions:" + e.getMessage(), e); } - - return true; } @Override - public CatalogPartition getPartition(Configuration conf, String catalogUrl, - String database, String tableName, - Map<String, String> partitionSpec) throws FalconException { - LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionSpec); + public void dropPartitions(Configuration conf, String catalogUrl, + String database, String tableName, + List<String> partitionValues, boolean deleteData) throws FalconException { + LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitionValues); try { - HCatClient client = createHCatClient(conf, catalogUrl); - HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec); - return createCatalogPartition(hCatPartition); - } catch (HCatException e) { - throw new FalconException("Exception fetching partition:" + e.getMessage(), e); + HiveMetaStoreClient client = createClient(conf, catalogUrl); + List<Partition> partitions = client.listPartitions(database, tableName, partitionValues, (short) -1); + for (Partition part : partitions) { + LOG.info("Dropping partition for: {}, partition: {}", tableName, part.getValues()); + client.dropPartition(database, tableName, part.getValues(), deleteData); + } + } catch (Exception e) { + throw new FalconException("Exception dropping partitions:" + e.getMessage(), e); } } @Override - public List<String> getTablePartitionCols(Configuration conf, String catalogUrl, - String database, - String tableName) throws FalconException { - LOG.info("Fetching partition columns of table: " + tableName); + public CatalogPartition getPartition(Configuration conf, String catalogUrl, + String database, String tableName, + List<String> partitionValues) throws FalconException { + LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionValues); try { - HCatClient client = createHCatClient(conf, catalogUrl); - HCatTable table = client.getTable(database, tableName); - List<HCatFieldSchema> partSchema = table.getPartCols(); - List<String> partCols = new ArrayList<String>(); - for (HCatFieldSchema part : partSchema) { - partCols.add(part.getName()); - } - return partCols; - } catch (HCatException e) { - throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e); + HiveMetaStoreClient client = createClient(conf, catalogUrl); + Partition hCatPartition = client.getPartition(database, tableName, partitionValues); + return createCatalogPartition(hCatPartition); + } catch (Exception e) { + throw new FalconException("Exception fetching partition:" + e.getMessage(), e); } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java index e68044a..59f558b 100644 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java @@ -20,6 +20,7 @@ package org.apache.falcon.entity; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; +import org.apache.falcon.catalog.AbstractCatalogService; import org.apache.falcon.catalog.CatalogPartition; import org.apache.falcon.catalog.CatalogServiceFactory; import org.apache.falcon.entity.common.FeedDataPath; @@ -46,7 +47,6 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -394,25 +394,25 @@ public class CatalogStorage extends Configured implements Storage { LOG.info("Applying retention on {}, Limit: {}, timezone: {}", getTable(), retentionLimit, timeZone); - // get sorted date partition keys and values - List<String> datedPartKeys = new ArrayList<String>(); - List<String> datedPartValues = new ArrayList<String>(); List<CatalogPartition> toBeDeleted; try { + // get sorted date partition keys and values + List<String> datedPartKeys = new ArrayList<String>(); + List<String> datedPartValues = new ArrayList<String>(); fillSortedDatedPartitionKVs(datedPartKeys, datedPartValues, retentionLimit, timeZone); toBeDeleted = discoverPartitionsToDelete(datedPartKeys, datedPartValues); - } catch (ELException e) { throw new FalconException("Couldn't find partitions to be deleted", e); } + if (toBeDeleted.isEmpty()) { LOG.info("No partitions to delete."); } else { final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal( getConf(), getCatalogUrl(), getDatabase(), getTable()); try { - dropPartitions(toBeDeleted, datedPartKeys, isTableExternal); + dropPartitions(toBeDeleted, isTableExternal); } catch (IOException e) { throw new FalconException("Couldn't drop partitions", e); } @@ -513,95 +513,31 @@ public class CatalogStorage extends Configured implements Storage { return filterBuffer.toString(); } - private void dropPartitions(List<CatalogPartition> partitionsToDelete, List<String> datedPartKeys, - boolean isTableExternal) throws FalconException, IOException { - - // get table partition columns - List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols( - getConf(), getCatalogUrl(), getDatabase(), getTable()); - - /* In case partition columns are a super-set of dated partitions, there can be multiple - * partitions that share the same set of date-partition values. All such partitions can - * be deleted by issuing a single HCatalog dropPartition call per date-partition values. - * Arrange the partitions grouped by each set of date-partition values. - */ - Map<Map<String, String>, List<CatalogPartition>> dateToPartitionsMap = new HashMap< - Map<String, String>, List<CatalogPartition>>(); - for (CatalogPartition partitionToDrop : partitionsToDelete) { - // create a map of name-values of all columns of this partition - Map<String, String> partitionsMap = new HashMap<String, String>(); - for (int i = 0; i < partColumns.size(); i++) { - partitionsMap.put(partColumns.get(i), partitionToDrop.getValues().get(i)); - } - - // create a map of name-values of dated sub-set of this partition - Map<String, String> datedPartitions = new HashMap<String, String>(); - for (String datedPart : datedPartKeys) { - datedPartitions.put(datedPart, partitionsMap.get(datedPart)); - } + private void dropPartitions(List<CatalogPartition> partitionsToDelete, boolean isTableExternal) + throws FalconException, IOException { + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + for (CatalogPartition partition : partitionsToDelete) { + boolean deleted = catalogService.dropPartition(getConf(), getCatalogUrl(), getDatabase(), getTable(), + partition.getValues(), true); - // add a map entry of this catalog partition corresponding to its date-partition values - List<CatalogPartition> catalogPartitions; - if (dateToPartitionsMap.containsKey(datedPartitions)) { - catalogPartitions = dateToPartitionsMap.get(datedPartitions); - } else { - catalogPartitions = new ArrayList<CatalogPartition>(); + if (!deleted) { + return; } - catalogPartitions.add(partitionToDrop); - dateToPartitionsMap.put(datedPartitions, catalogPartitions); - } - - // delete each entry within dateToPartitions Map - for (Map.Entry<Map<String, String>, List<CatalogPartition>> entry : dateToPartitionsMap.entrySet()) { - dropPartitionInstances(entry.getValue(), entry.getKey(), isTableExternal); - } - } - - private void dropPartitionInstances(List<CatalogPartition> partitionsToDrop, Map<String, String> partSpec, - boolean isTableExternal) throws FalconException, IOException { - - boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions( - getConf(), getCatalogUrl(), getDatabase(), getTable(), partSpec); - - if (!deleted) { - return; - } - for (CatalogPartition partitionToDrop : partitionsToDrop) { if (isTableExternal) { // nuke the dirs if an external table - final String location = partitionToDrop.getLocation(); - final Path path = new Path(location); - deleted = HadoopClientFactory.get() - .createProxiedFileSystem(path.toUri()) .delete(path, true); - } - if (!isTableExternal || deleted) { - // replace ',' with ';' since message producer splits instancePaths string by ',' - String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";"); - LOG.info("Deleted partition: " + partitionInfo); - instanceDates.append(partSpec).append(','); - instancePaths.append(getEvictedPartitionPath(partitionToDrop)) - .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); + final Path path = new Path(partition.getLocation()); + if (!HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).delete(path, true)) { + throw new FalconException("Failed to delete location " + path + " for partition " + + partition.getValues()); + } } - } - } - private String getEvictedPartitionPath(final CatalogPartition partitionToDrop) { - String uriTemplate = getUriTemplate(); // no need for location type for table - List<String> values = partitionToDrop.getValues(); - StringBuilder partitionPath = new StringBuilder(); - int index = 0; - for (String partitionKey : getDatedPartitionKeys()) { - String dateMask = getPartitionValue(partitionKey); - String date = values.get(index); - - partitionPath.append(uriTemplate.replace(dateMask, date)); - partitionPath.append(CatalogStorage.PARTITION_SEPARATOR); - LOG.info("partitionPath: " + partitionPath); + // replace ',' with ';' since message producer splits instancePaths string by ',' + String partitionInfo = partition.getValues().toString().replace(",", ";"); + LOG.info("Deleted partition: " + partitionInfo); + instanceDates.append(partitionInfo).append(','); + instancePaths.append(partition.getLocation()).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); } - partitionPath.setLength(partitionPath.length() - 1); - - LOG.info("Return partitionPath: " + partitionPath); - return partitionPath.toString(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7a8aacc..5a6c095 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,6 @@ <oozie.forcebuild>false</oozie.forcebuild> <activemq.version>5.4.3</activemq.version> <hive.version>0.11.0</hive.version> - <hcatalog.version>0.11.0</hcatalog.version> <jetty.version>6.1.26</jetty.version> <jersey.version>1.9</jersey.version> <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo> @@ -872,23 +871,11 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-common</artifactId> - <version>${hive.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - </exclusions> - </dependency> - <!-- this is needed for embedded oozie --> <dependency> <groupId>org.apache.hcatalog</groupId> <artifactId>webhcat-java-client</artifactId> - <version>${hcatalog.version}</version> + <version>${hive.version}</version> <exclusions> <exclusion> <!-- This implies you cannot use orc files --> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java index 4ceca29..9e8c76a 100644 --- a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java +++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java @@ -184,7 +184,7 @@ public class FalconAuthenticationFilterTest { public void testGetKerberosPrincipalWithSubstitutedHostSecure() throws Exception { String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL); - String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName() + "@Example.com"; + String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName().toLowerCase() + "@Example.com"; try { Configuration conf = new Configuration(false); conf.set("hadoop.security.authentication", "kerberos"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java index d5b7db0..d35abfa 100644 --- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java +++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.Map; @@ -153,7 +154,7 @@ public class LateDataHandler extends Configured implements Tool { * The assumption is that if a partition has changed or reinstated, the underlying * metric would change, either size or create time. * - * @param feedUriTemplate URI for the feed storage, filesystem path or table uri + * @param feedUri URI for the feed storage, filesystem path or table uri * @param feedStorageType feed storage type * @param conf configuration * @return computed metric @@ -161,19 +162,19 @@ public class LateDataHandler extends Configured implements Tool { * @throws FalconException * @throws URISyntaxException */ - public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf) + public long computeStorageMetric(String feedUri, String feedStorageType, Configuration conf) throws IOException, FalconException, URISyntaxException { Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType); if (storageType == Storage.TYPE.FILESYSTEM) { // usage on file system is the metric - return getFileSystemUsageMetric(feedUriTemplate, conf); + return getFileSystemUsageMetric(feedUri, conf); } else if (storageType == Storage.TYPE.TABLE) { // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme - feedUriTemplate = feedUriTemplate.replace("hcat", "thrift"); + feedUri = feedUri.replace("hcat", "thrift"); // creation time of the given partition is the metric - return getTablePartitionCreateTimeMetric(feedUriTemplate); + return getTablePartitionCreateTimeMetric(feedUri); } throw new IllegalArgumentException("Unknown storage type: " + feedStorageType); @@ -222,20 +223,20 @@ public class LateDataHandler extends Configured implements Tool { * If this partition was reinstated, the assumption is that the create time of * this partition would change. * - * @param feedUriTemplate catalog table uri + * @param feedUri catalog table uri * @return metric as creation time of the given partition * @throws IOException * @throws URISyntaxException * @throws FalconException */ - private long getTablePartitionCreateTimeMetric(String feedUriTemplate) + private long getTablePartitionCreateTimeMetric(String feedUri) throws IOException, URISyntaxException, FalconException { CatalogStorage storage = (CatalogStorage) - FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate, getConf()); + FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUri, getConf()); CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition( getConf(), storage.getCatalogUrl(), storage.getDatabase(), - storage.getTable(), storage.getPartitions()); + storage.getTable(), new ArrayList(storage.getPartitions().values())); return partition == null ? 0 : partition.getCreateTime(); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java index fcf7f96..6fd23a0 100644 --- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java +++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java @@ -38,8 +38,8 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -64,7 +64,7 @@ public class HiveCatalogServiceIT { CurrentUser.authenticate(TestContext.REMOTE_USER); hiveCatalogService = new HiveCatalogService(); - client = HiveCatalogService.getHCatClient(METASTORE_URL); + client = TestContext.getHCatClient(METASTORE_URL); createDatabase(); createTable(); @@ -170,11 +170,6 @@ public class HiveCatalogServiceIT { } @Test - public void testGet() throws Exception { - Assert.assertNotNull(HiveCatalogService.getHCatClient(METASTORE_URL)); - } - - @Test public void testIsAlive() throws Exception { Assert.assertTrue(hiveCatalogService.isAlive(conf, METASTORE_URL)); } @@ -247,7 +242,7 @@ public class HiveCatalogServiceIT { throws Exception { List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter( - conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter); + conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter); Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount); } @@ -272,34 +267,23 @@ public class HiveCatalogServiceIT { @Test public void testDropPartition() throws Exception { - Map<String, String> partialPartitionSpec = new HashMap<String, String>(); - partialPartitionSpec.put("ds", "20130903"); - - Assert.assertTrue(hiveCatalogService.dropPartitions( - conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec)); - + hiveCatalogService.dropPartition( + conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"), true); List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME); - Assert.assertEquals(partitions.size(), 1, "Unexpected number of partitions"); - Assert.assertEquals(new String[]{"20130902", "in"}, + Assert.assertEquals(partitions.size(), 2, "Unexpected number of partitions"); + Assert.assertEquals(new String[]{"20130903", "in"}, partitions.get(0).getValues().toArray(), "Mismatched partition"); - partialPartitionSpec = new HashMap<String, String>(); - partialPartitionSpec.put("ds", "20130902"); - - Assert.assertTrue(hiveCatalogService.dropPartitions( - conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec)); + hiveCatalogService.dropPartitions( + conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130903"), true); partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME); Assert.assertEquals(partitions.size(), 0, "Unexpected number of partitions"); } @Test public void testGetPartition() throws Exception { - Map<String, String> partitionSpec = new HashMap<String, String>(); - partitionSpec.put("ds", "20130902"); - partitionSpec.put("region", "in"); - CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition( - conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec); + conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in")); Assert.assertNotNull(partition); long createTime = partition.getCreateTime(); @@ -308,7 +292,7 @@ public class HiveCatalogServiceIT { @Test public void testReInstatePartition() throws Exception { - Map<String, String> partitionSpec = new HashMap<String, String>(); + Map<String, String> partitionSpec = new LinkedHashMap<String, String>(); partitionSpec.put("ds", "20130918"); partitionSpec.put("region", "blah"); @@ -317,7 +301,7 @@ public class HiveCatalogServiceIT { client.addPartition(first); CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition( - conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec); + conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values())); Assert.assertNotNull(partition); final long originalCreateTime = partition.getCreateTime(); @@ -331,7 +315,7 @@ public class HiveCatalogServiceIT { client.addPartition(second); CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition( - conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec); + conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values())); Assert.assertNotNull(reInstatedPartition); final long reInstatedCreateTime = reInstatedPartition.getCreateTime(); @@ -345,14 +329,4 @@ public class HiveCatalogServiceIT { {EXTERNAL_TABLE_NAME}, }; } - - @Test (dataProvider = "tableName") - public void testGetTablePartitionCols(String tableName) throws Exception { - List<String> partCols = CatalogServiceFactory.getCatalogService().getTablePartitionCols( - conf, METASTORE_URL, DATABASE_NAME, tableName); - Assert.assertEquals(partCols.size(), 2); - Collections.sort(partCols); - Assert.assertEquals(partCols.get(0), "ds"); - Assert.assertEquals(partCols.get(1), "region"); - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java index c4e046b..1885bb7 100644 --- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java +++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java @@ -20,7 +20,6 @@ package org.apache.falcon.late; import org.apache.falcon.catalog.CatalogPartition; import org.apache.falcon.catalog.CatalogServiceFactory; -import org.apache.falcon.catalog.HiveCatalogService; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Interfacetype; @@ -42,6 +41,7 @@ import org.testng.annotations.Test; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; @@ -199,7 +199,7 @@ public class LateDataHandlerIT { } private void reinstatePartition() throws Exception { - final HCatClient client = HiveCatalogService.getHCatClient(metastoreUrl); + final HCatClient client = context.getHCatClient(metastoreUrl); Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put("ds", PARTITION_VALUE); @@ -213,7 +213,7 @@ public class LateDataHandlerIT { client.addPartition(reinstatedPartition); CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition( - conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, partitionSpec); + conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values())); Assert.assertNotNull(reInstatedPartition); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java index d508a2d..ba6698c 100644 --- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java +++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java @@ -20,9 +20,9 @@ package org.apache.falcon.lifecycle; import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.Pair; -import org.apache.falcon.catalog.HiveCatalogService; import org.apache.falcon.entity.Storage; import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.resource.TestContext; import org.apache.falcon.retention.FeedEvictor; import org.apache.falcon.util.HiveTestUtils; import org.apache.hadoop.conf.Configuration; @@ -87,7 +87,7 @@ public class TableStorageFeedEvictorIT { public void setUp() throws Exception { FeedEvictor.OUT.set(stream); - client = HiveCatalogService.getHCatClient(METASTORE_URL); + client = TestContext.getHCatClient(METASTORE_URL); HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME); final List<String> partitionKeys = Arrays.asList("ds", "region"); @@ -156,7 +156,7 @@ public class TableStorageFeedEvictorIT { "-retentionType", "instance", "-retentionLimit", retentionLimit, "-timeZone", timeZone, - "-frequency", "daily", + "-frequency", "days(1)", "-logFile", logFile, "-falconFeedStorageType", Storage.TYPE.TABLE.name(), }); @@ -372,15 +372,16 @@ public class TableStorageFeedEvictorIT { FileSystem fs = path.getFileSystem(new Configuration()); for (String candidatePartition : candidatePartitions) { + path = new Path(EXTERNAL_TABLE_LOCATION + candidatePartition); if (isTableExternal) { - touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition); + touch(fs, path.toString()); } Map<String, String> partition = new HashMap<String, String>(); partition.put("ds", candidatePartition); //yyyyMMDD partition.put("region", "in"); HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create( - DATABASE_NAME, tableName, null, partition).build(); + DATABASE_NAME, tableName, isTableExternal ? path.toString() : null, partition).build(); client.addPartition(addPtn); } } @@ -408,7 +409,7 @@ public class TableStorageFeedEvictorIT { } private void touch(FileSystem fs, String path) throws Exception { - fs.create(new Path(path)).close(); + fs.mkdirs(new Path(path)); } private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/resource/TestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java index 64f98d4..23df745 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java @@ -28,6 +28,7 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.net.util.TrustManagerUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconRuntimException; +import org.apache.falcon.catalog.HiveCatalogService; import org.apache.falcon.cli.FalconCLI; import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.client.FalconClient; @@ -42,11 +43,14 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hcatalog.api.HCatClient; import org.testng.Assert; import javax.net.ssl.HostnameVerifier; @@ -123,6 +127,22 @@ public class TestContext { } } + /** + * This is only used for tests. + * + * @param metastoreUrl metastore url + * @return client object + * @throws FalconException + */ + public static HCatClient getHCatClient(String metastoreUrl) throws FalconException { + try { + HiveConf hcatConf = HiveCatalogService.createHiveConf(new Configuration(false), metastoreUrl); + return HCatClient.create(hcatConf); + } catch (Exception e) { + throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e); + } + } + public void configure() throws Exception { try { StartupProperties.get().setProperty( http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java index 3b71f08..9fd1a9d 100644 --- a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java +++ b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java @@ -18,7 +18,7 @@ package org.apache.falcon.util; -import org.apache.falcon.catalog.HiveCatalogService; +import org.apache.falcon.resource.TestContext; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -44,20 +44,20 @@ public final class HiveTestUtils { public static void createDatabase(String metaStoreUrl, String databaseName) throws Exception { - HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); + HCatClient client = TestContext.getHCatClient(metaStoreUrl); HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(databaseName) .ifNotExists(true).build(); client.createDatabase(dbDesc); } public static void dropDatabase(String metaStoreUrl, String databaseName) throws Exception { - HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); + HCatClient client = TestContext.getHCatClient(metaStoreUrl); client.dropDatabase(databaseName, true, HCatClient.DropDBMode.CASCADE); } public static void createTable(String metaStoreUrl, String databaseName, String tableName) throws Exception { - HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); + HCatClient client = TestContext.getHCatClient(metaStoreUrl); ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>(); cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment")); cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment")); @@ -72,7 +72,7 @@ public final class HiveTestUtils { public static void createTable(String metaStoreUrl, String databaseName, String tableName, List<String> partitionKeys) throws Exception { - HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); + HCatClient client = TestContext.getHCatClient(metaStoreUrl); ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>(); cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment")); cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment")); @@ -112,7 +112,7 @@ public final class HiveTestUtils { .location(externalLocation) .build(); - HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); + HCatClient client = TestContext.getHCatClient(metaStoreUrl); client.createTable(tableDesc); } @@ -147,7 +147,7 @@ public final class HiveTestUtils { public static void dropTable(String metaStoreUrl, String databaseName, String tableName) throws Exception { - HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl); + HCatClient client = TestContext.getHCatClient(metaStoreUrl); client.dropTable(databaseName, tableName, true); } @@ -182,6 +182,6 @@ public final class HiveTestUtils { Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put(partitionKey, partitionValue); - return HiveCatalogService.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec); + return TestContext.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec); } }