This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2d3f047f6b0 HIVE-28601: Leverage configurable getPartitions API in HMS
to decrease memory footprint in HS2 (Araika Singh, reviewed by Denys Kuzmenko,
Wechar Yu)
2d3f047f6b0 is described below
commit 2d3f047f6b05fd0066ac51832c6c8885be1809c6
Author: NZEC <[email protected]>
AuthorDate: Mon Jan 20 14:40:09 2025 +0530
HIVE-28601: Leverage configurable getPartitions API in HMS to decrease
memory footprint in HS2 (Araika Singh, reviewed by Denys Kuzmenko, Wechar Yu)
Closes #5539
---
.../status/formatter/ShowTableStatusFormatter.java | 17 +-
.../ql/ddl/table/partition/PartitionUtils.java | 23 ++
.../AlterTableExchangePartitionAnalyzer.java | 34 ++-
.../view/create/AbstractCreateViewAnalyzer.java | 15 +-
.../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 15 +-
.../hive/ql/exec/repl/ReplExternalTables.java | 16 +-
.../repl/bootstrap/load/table/LoadPartitions.java | 15 +-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 112 +++++++++-
.../hadoop/hive/ql/metadata/PartitionIterable.java | 53 ++++-
.../hadoop/hive/ql/metadata/PartitionTree.java | 70 ++++++
.../ql/metadata/SessionHiveMetaStoreClient.java | 13 ++
.../apache/hadoop/hive/ql/metadata/TempTable.java | 6 +
.../hadoop/hive/ql/stats/StatsUpdaterThread.java | 14 +-
.../exec/TestGetPartitionsWithSpecsInBatches.java | 237 +++++++++++++++++++++
.../exchange_partition_neg_partition_missing.q.out | 2 +-
.../hive/metastore/HiveMetaStoreChecker.java | 7 +-
.../GetPartitionProjectionsSpecBuilder.java | 11 +
17 files changed, 633 insertions(+), 27 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/show/status/formatter/ShowTableStatusFormatter.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/show/status/formatter/ShowTableStatusFormatter.java
index 045bb776d88..7e4947fec83 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/show/status/formatter/ShowTableStatusFormatter.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/show/status/formatter/ShowTableStatusFormatter.java
@@ -22,6 +22,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -33,6 +36,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -89,7 +93,18 @@ List<Path> getLocations(Hive db, Partition partition, Table
table) throws HiveEx
List<Path> locations = new ArrayList<Path>();
if (table.isPartitioned()) {
if (partition == null) {
- for (Partition currPartition : db.getPartitions(table)) {
+ List<Partition> partitions;
+ GetProjectionsSpec getProjectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+ .addProjectFieldList(Arrays.asList("sd.location")).build();
+ GetPartitionsRequest request = new
GetPartitionsRequest(table.getDbName(), table.getTableName(),
getProjectionsSpec, null);
+ request.setCatName(table.getCatName());
+ try {
+ partitions = db.getPartitionsWithSpecs(table, request);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
+ for (Partition currPartition : partitions) {
if (currPartition.getLocation() != null) {
locations.add(new Path(currPartition.getLocation()));
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/PartitionUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/PartitionUtils.java
index d5128fe79f4..3f6181f425f 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/PartitionUtils.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/PartitionUtils.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -119,6 +121,27 @@ public static List<Partition> getPartitions(Hive db, Table
table, Map<String, St
return partitions;
}
+ public static List<Partition> getPartitionsWithSpecs(Hive db, Table table,
GetPartitionsRequest request,
+ boolean throwException) throws SemanticException {
+ List<Partition> partitions = null;
+ try {
+ partitions = db.getPartitionsWithSpecs(table, request);
+ } catch (Exception e) {
+ throw new SemanticException(toMessage(ErrorMsg.INVALID_PARTITION,
request.getFilterSpec())
+ + " for the following partition keys: " +
tablePartitionColNames(table), e);
+ }
+ if (partitions.isEmpty() && throwException) {
+ throw new SemanticException(toMessage(ErrorMsg.INVALID_PARTITION,
request.getFilterSpec())
+ + " for the following partition keys: " +
tablePartitionColNames(table));
+ }
+ return partitions;
+ }
+
+ private static String tablePartitionColNames(Table table) {
+ List<FieldSchema> partCols = table.getPartCols();
+ return String.join("/", partCols.toString());
+ }
+
private static String toMessage(ErrorMsg message, Object detail) {
return detail == null ? message.getMsg() :
message.getMsg(detail.toString());
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/exchange/AlterTableExchangePartitionAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/exchange/AlterTableExchangePartitionAnalyzer.java
index 5069d67ee1f..cab0258564d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/exchange/AlterTableExchangePartitionAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/exchange/AlterTableExchangePartitionAnalyzer.java
@@ -18,11 +18,18 @@
package org.apache.hadoop.hive.ql.ddl.table.partition.exchange;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
@@ -77,9 +84,20 @@ protected void analyzeCommand(TableName tableName,
Map<String, String> partition
if (AcidUtils.isTransactionalTable(sourceTable) ||
AcidUtils.isTransactionalTable(destTable)) {
throw new
SemanticException(ErrorMsg.EXCHANGE_PARTITION_NOT_ALLOWED_WITH_TRANSACTIONAL_TABLES.getMsg());
}
+ List<String> sourceProjectFilters =
MetaStoreUtils.getPvals(sourceTable.getPartCols(), partitionSpecs);
// check if source partition exists
- PartitionUtils.getPartitions(db, sourceTable, partitionSpecs, true);
+ GetPartitionsFilterSpec sourcePartitionsFilterSpec = new
GetPartitionsFilterSpec();
+ sourcePartitionsFilterSpec.setFilters(sourceProjectFilters);
+ sourcePartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_VALUES);
+
+ GetProjectionsSpec getProjectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+ .addProjectFieldList(Arrays.asList("values")).build();
+
+ GetPartitionsRequest request = new
GetPartitionsRequest(sourceTable.getDbName(), sourceTable.getTableName(),
+ getProjectionsSpec, sourcePartitionsFilterSpec);
+ request.setCatName(sourceTable.getCatName());
+ PartitionUtils.getPartitionsWithSpecs(db, sourceTable, request, true);
// Verify that the partitions specified are continuous
// If a subpartition value is specified without specifying a partition's
value then we throw an exception
@@ -88,13 +106,23 @@ protected void analyzeCommand(TableName tableName,
Map<String, String> partition
throw new
SemanticException(ErrorMsg.PARTITION_VALUE_NOT_CONTINUOUS.getMsg(partitionSpecs.toString()));
}
+ List<String> destProjectFilters =
MetaStoreUtils.getPvals(destTable.getPartCols(), partitionSpecs);
+
+ // check if dest partition exists
+ GetPartitionsFilterSpec getDestPartitionsFilterSpec = new
GetPartitionsFilterSpec();
+ getDestPartitionsFilterSpec.setFilters(destProjectFilters);
+ getDestPartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_VALUES);
+
List<Partition> destPartitions = null;
+ GetPartitionsRequest destRequest = new
GetPartitionsRequest(destTable.getDbName(), destTable.getTableName(),
+ getProjectionsSpec, getDestPartitionsFilterSpec);
+ destRequest.setCatName(destTable.getCatName());
try {
- destPartitions = PartitionUtils.getPartitions(db, destTable,
partitionSpecs, true);
+ destPartitions = PartitionUtils.getPartitionsWithSpecs(db, destTable,
destRequest, true);
} catch (SemanticException ex) {
// We should expect a semantic exception being throw as this partition
should not be present.
}
- if (destPartitions != null) {
+ if (CollectionUtils.isNotEmpty(destPartitions)) {
// If any destination partition is present then throw a Semantic
Exception.
throw new
SemanticException(ErrorMsg.PARTITION_EXISTS.getMsg(destPartitions.toString()));
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/AbstractCreateViewAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/AbstractCreateViewAnalyzer.java
index f889ada9002..ebc80b01652 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/AbstractCreateViewAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/AbstractCreateViewAnalyzer.java
@@ -18,12 +18,16 @@
package org.apache.hadoop.hive.ql.ddl.view.create;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -37,6 +41,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.thrift.TException;
/**
* Abstract ancestor of analyzers that can create a view.
@@ -112,9 +117,15 @@ protected void validateReplaceWithPartitions(String
viewName, Table oldView, Lis
String partitionViewErrorMsg = "The following view has partition, it could
not be replaced: " + viewName;
List<Partition> partitions = null;
+ GetProjectionsSpec getProjectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+ .addProjectFieldList(Arrays.asList("values")).build();
+
+ GetPartitionsRequest request = new
GetPartitionsRequest(oldView.getDbName(), oldView.getTableName(),
+ getProjectionsSpec, null);
+ request.setCatName(oldView.getCatName());
try {
- partitions = db.getPartitions(oldView);
- } catch (HiveException e) {
+ partitions = db.getPartitionsWithSpecs(oldView, request);
+ } catch (HiveException | TException e) {
throw new
SemanticException(ErrorMsg.REPLACE_VIEW_WITH_PARTITION.getMsg(partitionViewErrorMsg));
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index c621194963d..3097889914d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -27,7 +27,10 @@
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
@@ -397,7 +400,17 @@ private static ArrayList<String> getListing(String dbName,
String tableName, Hiv
// Check if the table is partitioned, in case the table is partitioned we
need to check for the partitions
// listing as well.
if (table.isPartitioned()) {
- List<Partition> partitions = hiveDb.getPartitions(table);
+ GetProjectionsSpec getProjectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+ .addProjectFieldList(Arrays.asList("sd.location")).build();
+ GetPartitionsRequest request = new
GetPartitionsRequest(table.getDbName(), table.getTableName(),
+ getProjectionsSpec, null);
+ request.setCatName(table.getCatName());
+ List<Partition> partitions;
+ try {
+ partitions = hiveDb.getPartitionsWithSpecs(table, request);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
for (Partition part : partitions) {
Path partPath = part.getDataLocation();
// Build listing for the partition only if it doesn't lies within the
table location, else it would have been
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index 8f48a6ddda1..e0ec479f39e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -23,7 +23,10 @@
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
@@ -36,6 +39,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -97,15 +102,20 @@ void dataLocationDump(Table table, FileList fileList,
HashMap<String, Boolean> s
}
if (table.isPartitioned()) {
List<Partition> partitions;
+ GetProjectionsSpec projectionSpec = new
GetPartitionProjectionsSpecBuilder()
+ .addProjectFieldList(Arrays.asList("sd.location")).build();
+ GetPartitionsRequest request = new
GetPartitionsRequest(table.getDbName(), table.getTableName(),
+ projectionSpec, null);
+ request.setCatName(table.getCatName());
try {
- partitions = Hive.get(hiveConf).getPartitions(table);
- } catch (HiveException e) {
+ partitions = Hive.get(hiveConf).getPartitionsWithSpecs(table, request);
+ } catch (HiveException | TException e) {
if (e.getCause() instanceof NoSuchObjectException) {
// If table is dropped when dump in progress, just skip partitions
data location dump
LOG.debug(e.getMessage());
return;
}
- throw e;
+ throw new HiveException(e);
}
for (Partition partition : partitions) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 59373f55177..c3760aaeb97 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -25,6 +25,9 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import
org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
import
org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitionDesc;
@@ -53,6 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -137,7 +141,16 @@ public TaskTracker tasks() throws Exception {
if
(tablesToBootstrap.stream().anyMatch(table.getTableName()::equalsIgnoreCase)) {
Hive hiveDb = Hive.get(context.hiveConf);
// Collect the non-existing partitions to drop.
- List<Partition> partitions = hiveDb.getPartitions(table);
+ GetProjectionsSpec getProjectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+ .addProjectFieldList(Arrays.asList("values")).build();
+ GetPartitionsRequest request = new
GetPartitionsRequest(table.getDbName(), table.getTableName(),
+ getProjectionsSpec, null);
+ List<Partition> partitions;
+ try {
+ partitions = hiveDb.getPartitionsWithSpecs(table, request);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
List<String> newParts = event.partitions(tableDesc);
for (Partition part : partitions) {
if (!newParts.contains(part.getName())) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index f447aacdf7d..28f68d159f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -118,10 +118,13 @@
import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
import org.apache.hadoop.hive.metastore.api.GetFunctionsRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.SourceTable;
import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.Batchable;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
import org.apache.hadoop.hive.ql.Context;
@@ -162,6 +165,7 @@
import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
@@ -177,6 +181,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
import org.apache.hadoop.hive.metastore.api.PartitionSpec;
import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
@@ -4053,7 +4058,6 @@ public List<String> getPartitionNames(String dbName,
String tblName, short max)
public List<String> getPartitionNames(String dbName, String tblName,
Map<String, String> partSpec, short max) throws HiveException {
- List<String> names = null;
Table t = getTable(dbName, tblName);
if (t.getStorageHandler() != null &&
t.getStorageHandler().alwaysUnpartitioned()) {
return t.getStorageHandler().getPartitionNames(t, partSpec);
@@ -4061,11 +4065,20 @@ public List<String> getPartitionNames(String dbName,
String tblName,
List<String> pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec);
+ return getPartitionNamesByPartitionVals(dbName, tblName, pvals, max);
+ }
+
+ // get partition names from provided partition values
+ public List<String> getPartitionNamesByPartitionVals(String dbName, String
tblName,
+ List<String> pVals, short max) throws HiveException {
+ List<String> names = null;
+ Table t = getTable(dbName, tblName);
+
try {
GetPartitionNamesPsRequest req = new GetPartitionNamesPsRequest();
req.setTblName(tblName);
req.setDbName(dbName);
- req.setPartValues(pvals);
+ req.setPartValues(pVals);
req.setMaxParts(max);
if (AcidUtils.isTransactionalTable(t)) {
ValidWriteIdList validWriteIdList = getValidWriteIdList(dbName,
tblName);
@@ -4075,10 +4088,10 @@ public List<String> getPartitionNames(String dbName,
String tblName,
GetPartitionNamesPsResponse res =
getMSC().listPartitionNamesRequest(req);
names = res.getNames();
} catch (NoSuchObjectException nsoe) {
- // this means no partition exists for the given partition spec
+ // this means the catName/dbName/tblName are invalid or the table does
not exist for the given partition spec
// key value pairs - thrift cannot handle null return values, hence
- // listPartitionNames() throws NoSuchObjectException to indicate null
partitions
- return Lists.newArrayList();
+ // listPartitionNames() throws NoSuchObjectException
+ throw new HiveException("Invalid catName/dbName/tableName or table
doesn't exist.", nsoe);
} catch (Exception e) {
LOG.error("Failed getPartitionNames", e);
throw new HiveException(e);
@@ -4230,6 +4243,30 @@ public Void execute(int size) throws HiveException {
return result;
}
+ public Set<Partition> getAllPartitionsWithSpecsInBatches(Table tbl, int
batchSize, int decayingFactor,
+ int maxRetries, GetPartitionsRequest request) throws HiveException,
TException {
+ if (!tbl.isPartitioned()) {
+ return Sets.newHashSet(new Partition(tbl));
+ }
+ Set<Partition> result = new LinkedHashSet<>();
+ RetryUtilities.ExponentiallyDecayingBatchWork batchTask = new
RetryUtilities
+ .ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor,
maxRetries) {
+ @Override
+ public Void execute(int size) throws HiveException, TException {
+ result.clear();
+ PartitionIterable partitionIterable = new
PartitionIterable(Hive.get(), request, size);
+ partitionIterable.forEach(result::add);
+ return null;
+ }
+ };
+ try {
+ batchTask.run();
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ return result;
+ }
+
public List<Partition> getPartitions(Table tbl, Map<String, String>
partialPartSpec,
short limit) throws HiveException {
PerfLogger perfLogger = SessionState.getPerfLogger();
@@ -4466,6 +4503,69 @@ public List<Partition> getPartitionsByFilter(Table tbl,
String filter)
return convertFromMetastore(tbl, tParts);
}
+ public List<Partition> getPartitionsWithSpecs(Table tbl,
GetPartitionsRequest request)
+ throws HiveException, TException {
+
+ if (!tbl.isPartitioned()) {
+ throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED,
tbl.getTableName());
+ }
+ int batchSize = MetastoreConf.getIntVar(Hive.get().getConf(),
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+ if (batchSize > 0) {
+ return new ArrayList<>(getAllPartitionsWithSpecsInBatches(tbl,
batchSize, DEFAULT_BATCH_DECAYING_FACTOR, MetastoreConf.getIntVar(
+ Hive.get().getConf(),
MetastoreConf.ConfVars.GETPARTITIONS_BATCH_MAX_RETRIES), request));
+ } else {
+ return getPartitionsWithSpecsInternal(tbl, request);
+ }
+ }
+
+ List<Partition> getPartitionsWithSpecsInternal(Table tbl,
GetPartitionsRequest request)
+ throws HiveException, TException {
+
+ if (!tbl.isPartitioned()) {
+ throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED,
tbl.getTableName());
+ }
+ GetPartitionsResponse response = getMSC().getPartitionsWithSpecs(request);
+ List<org.apache.hadoop.hive.metastore.api.PartitionSpec> partitionSpecs =
response.getPartitionSpec();
+ List<Partition> partitions = new ArrayList<>();
+ partitions.addAll(convertFromPartSpec(partitionSpecs.iterator(), tbl));
+
+ return partitions;
+ }
+
+ List<Partition> getPartitionsWithSpecsByNames(Table tbl, List<String>
partNames, GetPartitionsRequest request)
+ throws HiveException, TException {
+
+ if (!tbl.isPartitioned()) {
+ throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED,
tbl.getTableName());
+ }
+ List<Partition> partitions = new ArrayList<Partition>(partNames.size());
+
+ int batchSize = HiveConf.getIntVar(conf,
HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX);
+ // I do not want to modify the original request when implementing
batching, hence we will know what actual request was being made
+ GetPartitionsRequest req = request;
+ if (!req.isSetFilterSpec()) {
+ req.setFilterSpec(new GetPartitionsFilterSpec());
+ }
+
+ try {
+ Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>()
{
+ @Override
+ public List<Void> run(List<String> list) throws Exception {
+ req.getFilterSpec().setFilters(list);
+ req.getFilterSpec().setFilterMode(PartitionFilterMode.BY_NAMES);
+ List<Partition> tParts = getPartitionsWithSpecsInternal(tbl, req);
+ if (tParts != null) {
+ partitions.addAll(tParts);
+ }
+ return Collections.emptyList();
+ }
+ });
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ return partitions;
+ }
+
private static List<Partition> convertFromMetastore(Table tbl,
List<org.apache.hadoop.hive.metastore.api.Partition> partitions) throws
HiveException {
if (partitions == null) {
@@ -4482,7 +4582,7 @@ private static List<Partition> convertFromMetastore(Table
tbl,
// This method converts PartitionSpec to Partiton.
// This is required because listPartitionsSpecByExpr return set of
PartitionSpec but hive
// require Partition
- private static List<Partition> convertFromPartSpec(Iterator<PartitionSpec>
iterator, Table tbl)
+ static List<Partition> convertFromPartSpec(Iterator<PartitionSpec> iterator,
Table tbl)
throws HiveException, TException {
if(!iterator.hasNext()) {
return Collections.emptyList();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
index dabfef014ce..aa8e0d0bb91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.hive.ql.metadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
+import org.apache.thrift.TException;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -99,12 +103,16 @@ private void getNextBatch() {
batchCounter++;
}
try {
- if (isAuthRequired) {
- batchIter = db.getPartitionsAuthByNames(table, nameBatch,
userName, groupNames).iterator();
+ if (getPartitionsRequest == null) {
+ if (isAuthRequired) {
+ batchIter = db.getPartitionsAuthByNames(table, nameBatch,
userName, groupNames).iterator();
+ } else {
+ batchIter = db.getPartitionsByNames(table, nameBatch,
getColStats).iterator();
+ }
} else {
- batchIter = db.getPartitionsByNames(table, nameBatch,
getColStats).iterator();
+ batchIter = db.getPartitionsWithSpecsByNames(table, nameBatch,
getPartitionsRequest).iterator();
}
- } catch (HiveException e) {
+ } catch (HiveException | TException e) {
throw new RuntimeException(e);
}
}
@@ -137,6 +145,7 @@ enum Type {
private boolean isAuthRequired = false;
private String userName;
private List<String> groupNames;
+ private GetPartitionsRequest getPartitionsRequest;
/**
* Dummy constructor, which simply acts as an iterator on an already-present
@@ -173,6 +182,42 @@ public PartitionIterable(Hive db, Table table, Map<String,
String> partialPartit
this(db, table, partialPartitionSpec, batchSize, getColStats, false, null,
null);
}
+ public PartitionIterable(Hive db, GetPartitionsRequest getPartitionsRequest,
int batchSize)
+ throws HiveException {
+ if (batchSize < 1) {
+ throw new HiveException("Invalid batch size for partition iterable.
Please use a batch size greater than 0");
+ }
+ this.currType = Type.LAZY_FETCH_PARTITIONS;
+ this.db = db;
+ this.table = db.getTable(getPartitionsRequest.getDbName(),
getPartitionsRequest.getTblName());
+ this.batchSize = batchSize;
+ this.getPartitionsRequest = getPartitionsRequest;
+ List<String> pVals = null;
+ if (getPartitionsRequest.isSetFilterSpec()) {
+ pVals = this.getPartitionsRequest.getFilterSpec().getFilters();
+ }
+ if (pVals == null) {
+ partitionNames = db.getPartitionNames(
+ table.getDbName(),table.getTableName(), (short) -1);
+ } else {
+ PartitionFilterMode filterMode =
getPartitionsRequest.getFilterSpec().getFilterMode();
+ switch (filterMode) {
+ case BY_NAMES:
+ partitionNames = pVals;
+ break;
+ case BY_VALUES:
+ partitionNames = db.getPartitionNamesByPartitionVals(
+ table.getDbName(),table.getTableName(),pVals,(short)-1);
+ break;
+ case BY_EXPR:
+ // TO-DO: this can be dealt with in a seperate PR. The current
changes does not have a particular use case for this.
+ throw new HiveException("getpartitionsbyexpr is currently
unsupported for the getpartitionswithspecs API");
+ default:
+ throw new HiveException("No such partition filter mode: " +
filterMode);
+ }
+ }
+ }
+
private PartitionIterable(Hive db, Table table, Map<String, String>
partialPartitionSpec,
int batchSize, boolean getColStats, boolean isAuthRequired, String
userName,
List<String> groupNames) throws HiveException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionTree.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionTree.java
index f518fc14996..3c1e96172fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionTree.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionTree.java
@@ -18,10 +18,16 @@
package org.apache.hadoop.hive.ql.metadata;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
+import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,4 +277,68 @@ List<Partition> getPartitionsByFilter(final String filter)
throws MetaException
}
return result;
}
+
+ GetPartitionsResponse getPartitionsWithSpecs(GetPartitionsRequest
getPartitionsRequest) throws MetaException {
+ List<Partition> result = new ArrayList<>();
+ PartitionListComposingSpec partListComp;
+
+ PartitionSpec partitionSpec = new PartitionSpec();
+ partitionSpec.setCatName(getPartitionsRequest.getCatName());
+ partitionSpec.setDbName(getPartitionsRequest.getDbName());
+ partitionSpec.setTableName(getPartitionsRequest.getTblName());
+
+ List<PartitionSpec> partSpecs;
+
+ GetPartitionsFilterSpec filterSpec = getPartitionsRequest.getFilterSpec();
+ if (filterSpec == null) {
+ partListComp = new PartitionListComposingSpec(new
ArrayList<>(parts.values()));
+ partitionSpec.setPartitionList(partListComp);
+
+ partSpecs = Arrays.asList(partitionSpec);
+ return new GetPartitionsResponse(partSpecs);
+ }
+
+ for (Map.Entry<String, Partition> entry : parts.entrySet()) {
+ Partition partition = entry.getValue();
+ boolean matches = false;
+
+ PartitionFilterMode filterMode = filterSpec.getFilterMode();
+ switch (filterMode) {
+ case BY_NAMES:
+ matches = filterSpec.getFilters().contains(entry.getKey());
+ break;
+ case BY_VALUES:
+ matches = filterSpec.getFilters().stream().anyMatch(str ->
entry.getValue().getValues().contains(str));
+ break;
+ case BY_EXPR:
+ ScriptEngine se = new
ScriptEngineManager().getEngineByName("JavaScript");
+ if (se == null) {
+ LOG.error("JavaScript script engine is not found, therefore
partition filtering "
+ + "for temporary tables is disabled.");
+ break;
+ }
+
+ for (String filter : filterSpec.getFilters()) {
+ try {
+ se.put("partition", partition);
+ matches = (Boolean) se.eval(filter);
+ } catch (ScriptException e) {
+ throw new MetaException("Error evaluating filter expression: " +
e.getMessage());
+ }
+ }
+ break;
+ default:
+ throw new MetaException("Unknown filter mode: " + filterMode);
+ }
+ if (matches) {
+ result.add(entry.getValue());
+ }
+ }
+
+ partListComp = new PartitionListComposingSpec(result);
+ partitionSpec.setPartitionList(partListComp);
+
+ partSpecs = Arrays.asList(partitionSpec);
+ return new GetPartitionsResponse(partSpecs);
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index af7aca6a373..73ab1438c35 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -71,6 +71,8 @@
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.GetTableResult;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
@@ -1128,6 +1130,17 @@ public List<Partition> listPartitionsWithAuthInfo(String
catName, String dbName,
return getPartitionsForMaxParts(tableName, parts, maxParts);
}
+ @Override
+ public GetPartitionsResponse getPartitionsWithSpecs(GetPartitionsRequest
request)
+ throws TException {
+ org.apache.hadoop.hive.metastore.api.Table table =
getTempTable(request.getDbName(), request.getTblName());
+ if (table == null) {
+ return super.getPartitionsWithSpecs(request);
+ }
+ TempTable tt = getPartitionedTempTable(table);
+ return tt.getPartitionsWithSpecs(request);
+ }
+
@Override
public List<Partition> listPartitionsWithAuthInfo(String catName, String
dbName, String tableName,
int maxParts, String userName, List<String> groupNames)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/TempTable.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/TempTable.java
index 2c178eaa7ab..646b19a7f79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/TempTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/TempTable.java
@@ -19,6 +19,8 @@
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -200,4 +202,8 @@ List<Partition> listPartitionsByFilter(String filter)
throws MetaException {
return pTree.getPartitionsByFilter(filter);
}
+ GetPartitionsResponse getPartitionsWithSpecs(GetPartitionsRequest
getPartitionsRequest) throws MetaException {
+ return pTree.getPartitionsWithSpecs(getPartitionsRequest);
+ }
+
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index 2f6dd2c378e..56f40f860d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -44,12 +45,16 @@
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
import org.apache.hadoop.hive.metastore.api.Table;
+import
org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
@@ -330,8 +335,15 @@ private List<String> findPartitionsToAnalyze(TableName
fullTableName, String cat
List<String> currentNames = partNames.subList(nextBatchStart,
nextBatchEnd);
currentBatchStart = nextBatchStart;
nextBatchStart = nextBatchEnd;
+
+ GetProjectionsSpec projectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+
.addProjectFieldList(Arrays.asList("values","parameters","writeId")).build();
+ GetPartitionsFilterSpec partitionsFilterSpec = new
GetPartitionsFilterSpec();
+ partitionsFilterSpec.setFilters(currentNames);
+ partitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_NAMES);
+
try {
- currentBatch = rs.getPartitionsByNames(cat, db, tbl, currentNames);
+ currentBatch = rs.getPartitionSpecsByFilterAndProjection(t,
projectionsSpec, partitionsFilterSpec);
} catch (NoSuchObjectException e) {
LOG.error("Failed to get partitions for " + fullTableName + ",
skipping some partitions", e);
currentBatch = null;
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionsWithSpecsInBatches.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionsWithSpecsInBatches.java
new file mode 100644
index 00000000000..62d9f75110d
--- /dev/null
+++
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionsWithSpecsInBatches.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfForTest;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.*;
+import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.*;
+
+public class TestGetPartitionsWithSpecsInBatches {
+ private final String catName = "hive";
+ private final String dbName = "default";
+ private final String tableName = "test_partition_batch";
+ private static HiveConf hiveConf;
+ private static HiveMetaStoreClient msc;
+ private static Hive hive;
+ private Table table;
+ private static final int NUM_PARTITIONS = 30;
+
+ @BeforeClass
+ public static void setupClass() throws HiveException {
+ hiveConf = new HiveConfForTest(TestGetPartitionsWithSpecsInBatches.class);
+ SessionState ss = SessionState.start(hiveConf);
+ hive = ss.getHiveDb();
+ try {
+ msc = new HiveMetaStoreClient(hiveConf);
+ } catch (MetaException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ PartitionUtil.createPartitionedTable(msc, catName, dbName, tableName);
+ table = msc.getTable(catName, dbName, tableName);
+ PartitionUtil.addPartitions(msc, dbName, tableName,
table.getSd().getLocation(), hiveConf, NUM_PARTITIONS);
+ }
+
+ @After
+ public void after() {
+ PartitionUtil.cleanUpTableQuietly(msc, catName, dbName, tableName);
+ }
+
+ @Test
+ public void testNumberOfGetPartitionCalls() throws Exception {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+ // test with a batch size of 10 and decaying factor of 2
+ hive.getAllPartitionsWithSpecsInBatches(hive.getTable(dbName,
tableName),10, 2, 0, new GetPartitionsRequest(dbName, tableName, new
GetProjectionsSpec(), new GetPartitionsFilterSpec()));
+ ArgumentCaptor<GetPartitionsRequest> req =
ArgumentCaptor.forClass(GetPartitionsRequest.class);
+ // there should be 3 calls to get partitions
+ verify(spyMSC, times(3)).getPartitionsWithSpecs(req.capture());
+ Assert.assertEquals(10, req.getValue().getFilterSpec().getFiltersSize());
+ }
+
+ @Test
+ public void testUnevenNumberOfGetPartitionCalls() throws Exception {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+
+ List<GetPartitionsRequest> capturedRequests = new ArrayList<>();
+ doAnswer((Answer<GetPartitionsResponse>) invocation -> {
+ GetPartitionsRequest request = invocation.getArgument(0);
+ capturedRequests.add(new GetPartitionsRequest(request));
+ return (GetPartitionsResponse) invocation.callRealMethod();
+ }).when(spyMSC).getPartitionsWithSpecs(any(GetPartitionsRequest.class));
+
+ // there should be 2 calls to get partitions with batch sizes of 19, 11
+ hive.getAllPartitionsWithSpecsInBatches(hive.getTable(dbName,
tableName),19, 2, 0, new GetPartitionsRequest(dbName, tableName, new
GetProjectionsSpec(), new GetPartitionsFilterSpec()));
+ ArgumentCaptor<GetPartitionsRequest> req =
ArgumentCaptor.forClass(GetPartitionsRequest.class);
+ // there should be 2 calls to get partitions
+ verify(spyMSC, times(2)).getPartitionsWithSpecs(req.capture());
+ // confirm the batch sizes were 19, 11 in the two calls to get partitions
+ Assert.assertEquals(19,
capturedRequests.get(0).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(11,
capturedRequests.get(1).getFilterSpec().getFiltersSize());
+ }
+
+ @Test
+ public void testSmallNumberOfPartitions() throws Exception {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+ hive.getAllPartitionsWithSpecsInBatches(hive.getTable(dbName,
tableName),100, 2, 0, new GetPartitionsRequest(dbName, tableName, new
GetProjectionsSpec(), new GetPartitionsFilterSpec()));
+ ArgumentCaptor<GetPartitionsRequest> req =
ArgumentCaptor.forClass(GetPartitionsRequest.class);
+ // there should be 1 call to get partitions
+ verify(spyMSC, times(1)).getPartitionsWithSpecs(req.capture());
+ Assert.assertEquals(30, req.getValue().getFilterSpec().getFiltersSize());
+ }
+
+ @Test
+ public void testRetriesExhaustedBatchSize() throws Exception {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+
+ List<GetPartitionsRequest> capturedRequests = new ArrayList<>();
+ doAnswer((Answer<Void>) invocation -> {
+ GetPartitionsRequest request = invocation.getArgument(0);
+ capturedRequests.add(new GetPartitionsRequest(request));
+ throw new MetaException("MetaException to test retries");
+ }).when(spyMSC).getPartitionsWithSpecs(any(GetPartitionsRequest.class));
+
+ try {
+ hive.getAllPartitionsWithSpecsInBatches(hive.getTable(dbName,
tableName), 30, 2, 0, new GetPartitionsRequest(dbName, tableName, new
GetProjectionsSpec(), new GetPartitionsFilterSpec()));
+ } catch (Exception ignored) {}
+ ArgumentCaptor<GetPartitionsRequest> req =
ArgumentCaptor.forClass(GetPartitionsRequest.class);
+ // there should be 5 call to get partitions with batch sizes as 30, 15, 7,
3, 1
+ verify(spyMSC, times(5)).getPartitionsWithSpecs(req.capture());
+
+ Assert.assertEquals(5, capturedRequests.size());
+
+ Assert.assertEquals(30,
capturedRequests.get(0).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(15,
capturedRequests.get(1).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(7,
capturedRequests.get(2).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(3,
capturedRequests.get(3).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(1,
capturedRequests.get(4).getFilterSpec().getFiltersSize());
+ }
+
+ @Test
+ public void testMaxRetriesReached() throws Exception {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+
+ List<GetPartitionsRequest> capturedRequests = new ArrayList<>();
+ doAnswer((Answer<Void>) invocation -> {
+ GetPartitionsRequest request = invocation.getArgument(0);
+ capturedRequests.add(new GetPartitionsRequest(request));
+ throw new MetaException("MetaException to test retries");
+ }).when(spyMSC).getPartitionsWithSpecs(any(GetPartitionsRequest.class));
+
+ try {
+ hive.getAllPartitionsWithSpecsInBatches(hive.getTable(dbName,
tableName), 30, 2, 2, new GetPartitionsRequest(dbName, tableName, new
GetProjectionsSpec(), new GetPartitionsFilterSpec()));
+ } catch (Exception ignored) {}
+ ArgumentCaptor<GetPartitionsRequest> req =
ArgumentCaptor.forClass(GetPartitionsRequest.class);
+ // there should be 2 call to get partitions with batch sizes as 30, 15
+ verify(spyMSC, times(2)).getPartitionsWithSpecs(req.capture());
+
+ Assert.assertEquals(2, capturedRequests.size());
+
+ Assert.assertEquals(30,
capturedRequests.get(0).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(15,
capturedRequests.get(1).getFilterSpec().getFiltersSize());
+ }
+
+ @Test
+ public void testBatchingWhenException() throws Exception {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+
+ List<GetPartitionsRequest> capturedRequests = new ArrayList<>();
+ AtomicInteger invocationCount = new AtomicInteger();
+ // This will throw exception only the first time.
+ doAnswer((Answer<GetPartitionsResponse>) invocation -> {
+ invocationCount.getAndIncrement();
+ GetPartitionsRequest request = invocation.getArgument(0);
+ capturedRequests.add(new GetPartitionsRequest(request));
+
+ if (invocationCount.get() == 1) {
+ throw new MetaException();
+ } else {
+ return (GetPartitionsResponse) invocation.callRealMethod();
+ }
+ }).when(spyMSC).getPartitionsWithSpecs(any(GetPartitionsRequest.class));
+
+ hive.getAllPartitionsWithSpecsInBatches(hive.getTable(dbName, tableName),
30, 2, 5, new GetPartitionsRequest(dbName, tableName, new GetProjectionsSpec(),
new GetPartitionsFilterSpec()));
+ ArgumentCaptor<GetPartitionsRequest> req =
ArgumentCaptor.forClass(GetPartitionsRequest.class);
+ // The first call with batch size of 30 will fail, the rest two call will
be of size 15 each. Total 3 calls
+ verify(spyMSC, times(3)).getPartitionsWithSpecs(req.capture());
+
+ Assert.assertEquals(3, capturedRequests.size());
+
+ Assert.assertEquals(30,
capturedRequests.get(0).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(15,
capturedRequests.get(1).getFilterSpec().getFiltersSize());
+ Assert.assertEquals(15,
capturedRequests.get(2).getFilterSpec().getFiltersSize());
+
+ Set<String> partNames = new
HashSet<>(capturedRequests.get(1).getFilterSpec().getFilters());
+ partNames.addAll(capturedRequests.get(2).getFilterSpec().getFilters());
+ assert(partNames.size() == 30);
+
+ List<String> partitionNames =
hive.getPartitionNames(table.getDbName(),table.getTableName(), (short) -1);
+ assert(partitionNames.size() == 30);
+ partitionNames.forEach(partNames::remove);
+ assert(partitionNames.size() == 30);
+ // In case any duplicate/incomplete list is given by
hive.getAllPartitionsWithSpecsInBatches, the below assertion will fail
+ assert(partNames.size() == 0);
+ }
+
+ @Test
+ public void testBatchingWhenBatchSizeIsZero() throws MetaException {
+ HiveMetaStoreClient spyMSC = spy(msc);
+ hive.setMSC(spyMSC);
+ int batchSize = 0;
+ try {
+ org.apache.hadoop.hive.ql.metadata.Table t = hive.getTable(dbName,
tableName);
+ new PartitionIterable(hive,
+ new GetPartitionsRequest(t.getDbName(), t.getTableName(), new
GetProjectionsSpec(), new GetPartitionsFilterSpec())
+ ,batchSize);
+ } catch (HiveException e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid batch size for
partition iterable." +
+ " Please use a batch size greater than 0"));
+ }
+ try {
+ new org.apache.hadoop.hive.metastore.PartitionIterable(msc, table,
batchSize).withProjectSpec(new GetPartitionsRequest(dbName, tableName, new
GetProjectionsSpec(), new GetPartitionsFilterSpec()));
+ } catch (MetastoreException e) {
+ Assert.assertTrue(e.getMessage().contains("Invalid batch size for
partition iterable." +
+ " Please use a batch size greater than 0"));
+ }
+ }
+}
diff --git
a/ql/src/test/results/clientnegative/exchange_partition_neg_partition_missing.q.out
b/ql/src/test/results/clientnegative/exchange_partition_neg_partition_missing.q.out
index 6757e6089d9..8eb6ea0bb3c 100644
---
a/ql/src/test/results/clientnegative/exchange_partition_neg_partition_missing.q.out
+++
b/ql/src/test/results/clientnegative/exchange_partition_neg_partition_missing.q.out
@@ -20,4 +20,4 @@ PREHOOK: Input: default@exchange_part_test1
POSTHOOK: query: SHOW PARTITIONS exchange_part_test1
POSTHOOK: type: SHOWPARTITIONS
POSTHOOK: Input: default@exchange_part_test1
-FAILED: SemanticException [Error 10006]: Partition not found {ds=2013-04-05}
+FAILED: SemanticException [Error 10006]: Partition not found
GetPartitionsFilterSpec(filterMode:BY_VALUES, filters:[2013-04-05]) for the
following partition keys: [FieldSchema(name:ds, type:string, comment:null)]
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
index 63497e7036f..4afc779cb66 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
@@ -270,11 +270,10 @@ void checkTable(String catName, String dbName, String
tableName, byte[] filterEx
MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.DEFAULTPARTITIONNAME), results);
parts = new PartitionIterable(results);
} else {
+ GetProjectionsSpec projectionsSpec = new
GetPartitionProjectionsSpecBuilder()
+
.addProjectFieldList(Arrays.asList("sd.location","createTime","values")).build();
GetPartitionsRequest request = new
GetPartitionsRequest(table.getDbName(), table.getTableName(),
- null, null);
- request.setProjectionSpec(new
GetPartitionProjectionsSpecBuilder().addProjectField("sd.location")
- .addProjectField("createTime").addProjectField("tableName")
- .addProjectField("values").build());
+ projectionsSpec, null);
request.setCatName(table.getCatName());
int batchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
if (batchSize > 0) {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
index 6bf898b09af..bfb82e9d35a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.hive.metastore.client.builder;
+import org.apache.commons.collections.CollectionUtils;
+
import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
@@ -41,6 +44,14 @@ public GetPartitionProjectionsSpecBuilder
addProjectField(String field) {
return this;
}
+ public GetPartitionProjectionsSpecBuilder addProjectFieldList(List<String>
fields) {
+ fieldList.addAll(Arrays.asList("catName","dbName","tableName"));
+ if (CollectionUtils.isNotEmpty(fields)) {
+ fieldList.addAll(fields);
+ }
+ return this;
+ }
+
public GetPartitionProjectionsSpecBuilder
setIncludePartitionPattern(String includePartitionPattern) {
this.includePartitionPattern = includePartitionPattern;
return this;