This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f9489b6b2f2 HIVE-28205: Implement direct sql for
get_partitions_ps_with_auth api (#5206) (Wechar Yu, reviewed by Zhihua Deng,
Butao Zhang)
f9489b6b2f2 is described below
commit f9489b6b2f24fa1d434cccf9b416067b3e67837a
Author: Wechar Yu <[email protected]>
AuthorDate: Wed Jun 19 09:23:04 2024 +0800
HIVE-28205: Implement direct sql for get_partitions_ps_with_auth api
(#5206) (Wechar Yu, reviewed by Zhihua Deng, Butao Zhang)
---
.../hive/metastore/utils/MetaStoreUtils.java | 20 +++
.../hadoop/hive/metastore/MetaStoreDirectSql.java | 52 +++++++-
.../apache/hadoop/hive/metastore/ObjectStore.java | 139 +++++++++++----------
.../hadoop/hive/metastore/tools/BenchmarkTool.java | 5 +
.../hadoop/hive/metastore/tools/HMSBenchmarks.java | 27 ++++
.../hadoop/hive/metastore/tools/HMSClient.java | 5 +
.../apache/hadoop/hive/metastore/tools/Util.java | 32 +++++
7 files changed, 206 insertions(+), 74 deletions(-)
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index c12c009b81c..bbd36befcd7 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -460,6 +460,26 @@ public class MetaStoreUtils {
}
return pvals;
}
+
+ /**
+ * If all the values of partVals are empty strings, it means we are returning
+ * all the partitions and hence we can use get_partitions API.
+ * @param partVals The partitions values used to filter out the partitions.
+ * @return true if partVals is empty or if all the values in partVals is
empty strings.
+ * other wise false.
+ */
+ public static boolean arePartValsEmpty(List<String> partVals) {
+ if (partVals == null || partVals.isEmpty()) {
+ return true;
+ }
+ for (String val : partVals) {
+ if (val != null && !val.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public static String makePartNameMatcher(Table table, List<String> partVals,
String defaultStr) throws MetaException {
List<FieldSchema> partCols = table.getPartitionKeys();
int numPartKeys = partCols.size();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 8c6701ae138..d910504a9d3 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -762,6 +762,28 @@ class MetaStoreDirectSql {
return getPartitionsByPartitionIdsInBatch(catName, dbName, tableName,
partitionIds, isAcidTable, args);
}
+ public List<Partition> getPartitionsViaSqlPs(Table table, GetPartitionsArgs
args) throws MetaException {
+ String catName = table.getCatName();
+ String dbName = table.getDbName();
+ String tblName = table.getTableName();
+
+ String sqlFilter = "" + PARTITIONS + ".\"PART_NAME\" like ? ";
+ String partialName = MetaStoreUtils.makePartNameMatcher(table,
args.getPart_vals(), "_%");
+ List<Long> partitionIds = getPartitionFieldsViaSqlFilter(
+ catName, dbName, tblName, Arrays.asList("\"PART_ID\""), sqlFilter,
+ Arrays.asList(partialName), Collections.emptyList(), args.getMax());
+ if (partitionIds.isEmpty()) {
+ return Collections.emptyList(); // no partitions, bail early.
+ }
+ boolean isAcidTable = TxnUtils.isAcidTable(table);
+ return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long,
Partition>() {
+ @Override
+ public List<Partition> run(List<Long> input) throws MetaException {
+ return getPartitionsByPartitionIds(catName, dbName, tblName, input,
isAcidTable, args);
+ }
+ });
+ }
+
/**
* This method can be used to return "partially-filled" partitions when
clients are only interested in
* some fields of the Partition objects. The partitionFields parameter is a
list of dot separated
@@ -1250,7 +1272,6 @@ class MetaStoreDirectSql {
}
public int getNumPartitionsViaSqlFilter(SqlFilterForPushdown filter) throws
MetaException {
- boolean doTrace = LOG.isDebugEnabled();
String catName = filter.catName.toLowerCase();
String dbName = filter.dbName.toLowerCase();
String tblName = filter.tableName.toLowerCase();
@@ -1273,13 +1294,32 @@ class MetaStoreDirectSql {
params[i + 3] = filter.params.get(i);
}
- long start = doTrace ? System.nanoTime() : 0;
try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
query.setUnique(true);
- int sqlResult =
MetastoreDirectSqlUtils.extractSqlInt(query.executeWithArray(params));
- long queryTime = doTrace ? System.nanoTime() : 0;
- MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start,
queryTime);
- return sqlResult;
+ return
MetastoreDirectSqlUtils.extractSqlInt(executeWithArray(query.getInnerQuery(),
params, queryText));
+ }
+ }
+
+ public int getNumPartitionsViaSqlPs(Table table, List<String> partVals)
throws MetaException {
+ String partialName = MetaStoreUtils.makePartNameMatcher(table, partVals,
"_%");
+
+ // Get number of partitions by doing count on PART_ID.
+ String queryText = "select count(" + PARTITIONS + ".\"PART_ID\") from " +
PARTITIONS + ""
+ + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS
+ ".\"TBL_ID\" "
+ + " and " + TBLS + ".\"TBL_NAME\" = ? "
+ + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS +
".\"DB_ID\" "
+ + " and " + DBS + ".\"NAME\" = ? "
+ + " where " + DBS + ".\"CTLG_NAME\" = ? and " + PARTITIONS +
".\"PART_NAME\" like ? ";
+
+ Object[] params = new Object[4];
+ params[0] = table.getTableName();
+ params[1] = table.getDbName();
+ params[2] = table.getCatName();
+ params[3] = partialName;
+
+ try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
+ query.setUnique(true);
+ return
MetastoreDirectSqlUtils.extractSqlInt(executeWithArray(query.getInnerQuery(),
params, queryText));
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index ec0b62e78a8..5ea24bc7535 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -3840,34 +3840,39 @@ public class ObjectStore implements RawStore,
Configurable {
@Override
public int getNumPartitionsByPs(String catName, String dbName, String
tblName, List<String> partVals)
throws MetaException, NoSuchObjectException {
- boolean success = false;
- Query query = null;
- Long result;
- try {
- openTransaction();
- LOG.debug("executing getNumPartitionsByPs");
- catName = normalizeIdentifier(catName);
- dbName = normalizeIdentifier(dbName);
- tblName = normalizeIdentifier(tblName);
- Table table = getTable(catName, dbName, tblName);
- if (table == null) {
- throw new NoSuchObjectException(TableName.getQualified(catName,
dbName, tblName)
- + " table not found");
+
+ catName = normalizeIdentifier(catName);
+ dbName = normalizeIdentifier(dbName);
+ tblName = normalizeIdentifier(tblName);
+
+ return new GetHelper<Integer>(catName, dbName, tblName, true, true) {
+
+ @Override
+ protected String describeResult() {
+ return "Partition count by partial values";
}
- // size is known since it contains dbName, catName, tblName and
partialRegex pattern
- Map<String, String> params = new HashMap<>(4);
- String filter = getJDOFilterStrForPartitionVals(table, partVals, params);
- query = pm.newQuery(
- "select count(partitionName) from
org.apache.hadoop.hive.metastore.model.MPartition"
- );
- query.setFilter(filter);
- query.declareParameters(makeParameterDeclarationString(params));
- result = (Long) query.executeWithMap(params);
- success = commitTransaction();
- } finally {
- rollbackAndCleanup(success, query);
- }
- return result.intValue();
+
+ @Override
+ protected Integer getSqlResult(GetHelper<Integer> ctx) throws
MetaException {
+ return directSql.getNumPartitionsViaSqlPs(ctx.getTable(), partVals);
+ }
+
+ @Override
+ protected Integer getJdoResult(GetHelper<Integer> ctx)
+ throws MetaException, NoSuchObjectException, InvalidObjectException {
+ // size is known since it contains dbName, catName, tblName and
partialRegex pattern
+ Map<String, String> params = new HashMap<>(4);
+ String filter = getJDOFilterStrForPartitionVals(ctx.getTable(),
partVals, params);
+ try (QueryWrapper query = new QueryWrapper(pm.newQuery(
+ "select count(partitionName) from
org.apache.hadoop.hive.metastore.model.MPartition"))) {
+ query.setFilter(filter);
+ query.declareParameters(makeParameterDeclarationString(params));
+ Long result = (Long) query.executeWithMap(params);
+
+ return result.intValue();
+ }
+ }
+ }.run(true);
}
/**
@@ -3884,8 +3889,10 @@ public class ObjectStore implements RawStore,
Configurable {
* you want results for. E.g., if resultsCol is partitionName, the
Collection
* has types of String, and if resultsCol is null, the types are
MPartition.
*/
- private Collection<String> getPartitionPsQueryResults(String catName, String
dbName, String tableName, List<String> part_vals,
- int max_parts, String resultsCol) throws Exception {
+ private Collection<String> getPartitionPsQueryResults(String catName, String
dbName,
+ String tableName,
List<String> part_vals,
+ int max_parts, String
resultsCol)
+ throws MetaException, NoSuchObjectException {
Preconditions.checkState(this.currentTransaction.isActive());
@@ -3900,7 +3907,7 @@ public class ObjectStore implements RawStore,
Configurable {
// pattern
Map<String, String> params = new HashMap<>(4);
String filter = getJDOFilterStrForPartitionVals(table, part_vals, params);
- try (Query query = pm.newQuery(MPartition.class)) {
+ try (QueryWrapper query = new QueryWrapper(pm.newQuery(MPartition.class)))
{
query.setFilter(filter);
query.declareParameters(makeParameterDeclarationString(params));
if (max_parts >= 0) {
@@ -3917,27 +3924,6 @@ public class ObjectStore implements RawStore,
Configurable {
}
}
- /**
- * If partVals all the values are empty strings, it means we are returning
- * all the partitions and hence we can attempt to use a directSQL equivalent
API which
- * is considerably faster.
- * @param partVals The partitions values used to filter out the partitions.
- * @return true if partVals is empty or if all the values in partVals is
empty strings.
- * other wise false. If user or groups is valid then returns false since the
directSQL
- * doesn't support partition privileges.
- */
- private boolean canTryDirectSQL(List<String> partVals) {
- if (partVals == null || partVals.isEmpty()) {
- return true;
- }
- for (String val : partVals) {
- if (val != null && !val.isEmpty()) {
- return false;
- }
- }
- return true;
- }
-
@Override
public List<Partition> listPartitionsPsWithAuth(String catName, String
db_name, String tbl_name,
GetPartitionsArgs args) throws MetaException, InvalidObjectException,
NoSuchObjectException {
@@ -3952,32 +3938,19 @@ public class ObjectStore implements RawStore,
Configurable {
throw new NoSuchObjectException(
TableName.getQualified(catName, db_name, tbl_name) + " table not
found");
}
- int max_parts = args.getMax();
String userName = args.getUserName();
List<String> groupNames = args.getGroupNames();
List<String> part_vals = args.getPart_vals();
List<String> partNames = args.getPartNames();
- boolean isAcidTable = TxnUtils.isAcidTable(mtbl.getParameters());
boolean getauth = null != userName && null != groupNames &&
"TRUE".equalsIgnoreCase(
mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"));
- // When partNames is given, sending to JDO directly.
- if (canTryDirectSQL(part_vals) && partNames == null) {
- LOG.info(
- "Redirecting to directSQL enabled API: db: {} tbl: {} partVals:
{}",
- db_name, tbl_name, part_vals);
+ if (MetaStoreUtils.arePartValsEmpty(part_vals) && partNames == null) {
partitions = getPartitions(catName, db_name, tbl_name, args);
+ } else if (partNames != null) {
+ partitions = getPartitionsByNames(catName, db_name, tbl_name, args);
} else {
- if (partNames != null) {
- partitions.addAll(getPartitionsViaOrmFilter(catName, db_name,
tbl_name, isAcidTable, args));
- } else {
- Collection parts = getPartitionPsQueryResults(catName, db_name,
tbl_name,
- part_vals, max_parts, null);
- for (Object o : parts) {
- Partition part = convertToPart(catName, db_name, tbl_name,
(MPartition) o, isAcidTable, args);
- partitions.add(part);
- }
- }
+ partitions = getPartitionsByPs(catName, db_name, tbl_name, args);
}
if (getauth) {
for (Partition part : partitions) {
@@ -3999,6 +3972,36 @@ public class ObjectStore implements RawStore,
Configurable {
return partitions;
}
+ private List<Partition> getPartitionsByPs(String catName, String dbName,
+ String tblName, GetPartitionsArgs
args)
+ throws MetaException, NoSuchObjectException {
+ catName = normalizeIdentifier(catName);
+ dbName = normalizeIdentifier(dbName);
+ tblName = normalizeIdentifier(tblName);
+
+ return new GetListHelper<Partition>(catName, dbName, tblName, true, true) {
+
+ @Override
+ protected List<Partition> getSqlResult(GetHelper<List<Partition>> ctx)
throws MetaException {
+ return directSql.getPartitionsViaSqlPs(ctx.getTable(), args);
+ }
+
+ @Override
+ protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx)
+ throws MetaException, NoSuchObjectException {
+ List<Partition> result = new ArrayList<>();
+ Collection parts = getPartitionPsQueryResults(catName, dbName, tblName,
+ args.getPart_vals(), args.getMax(), null);
+ boolean isAcidTable = TxnUtils.isAcidTable(ctx.getTable());
+ for (Object o : parts) {
+ Partition part = convertToPart(catName, dbName, tblName,
(MPartition) o, isAcidTable, args);
+ result.add(part);
+ }
+ return result;
+ }
+ }.run(true);
+ }
+
@Override
public List<String> listPartitionNamesPs(String catName, String dbName,
String tableName,
List<String> part_vals, short max_parts) throws MetaException,
NoSuchObjectException {
diff --git
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
index 5e3cb133953..551ffabe6b9 100644
---
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
+++
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
@@ -59,6 +59,7 @@ import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetP
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitions;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitionsByFilter;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitionsByName;
+import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitionsByPs;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitionsStat;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetTable;
import static
org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListAllTables;
@@ -289,6 +290,8 @@ public class BenchmarkTool implements Runnable {
() -> benchmarkGetPartitionsByName(bench, bData, 1))
.add("getPartitionsByFilter",
() -> benchmarkGetPartitionsByFilter(bench, bData, 1))
+ .add("getPartitionsByPs",
+ () -> benchmarkGetPartitionsByPs(bench, bData, 1))
.add("getPartitionsStat",
() -> benchmarkGetPartitionsStat(bench, bData, 1))
.add("updatePartitionsStat",
@@ -319,6 +322,8 @@ public class BenchmarkTool implements Runnable {
() -> benchmarkGetPartitionsByName(bench, bData, howMany))
.add("getPartitionsByFilter" + '.' + howMany,
() -> benchmarkGetPartitionsByFilter(bench, bData, howMany))
+ .add("getPartitionsByPs" + '.' + howMany,
+ () -> benchmarkGetPartitionsByPs(bench, bData, howMany))
.add("getPartitionsStat" + '.' + howMany,
() -> benchmarkGetPartitionsStat(bench, bData, howMany))
.add("updatePartitionsStat" + '.' + howMany,
diff --git
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
index 1f7a0d4ad35..c01200c33be 100644
---
a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
+++
b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
@@ -43,6 +43,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitions;
import static
org.apache.hadoop.hive.metastore.tools.Util.addManyPartitionsNoException;
@@ -438,6 +440,31 @@ final class HMSBenchmarks {
}
}
+ static DescriptiveStatistics benchmarkGetPartitionsByPs(@NotNull
MicroBenchmark bench,
+ @NotNull BenchData
data,
+ int count) {
+ final HMSClient client = data.getClient();
+ String dbName = data.dbName;
+ String tableName = data.tableName;
+
+ BenchmarkUtils.createPartitionedTable(client, dbName, tableName,
createSchema(Arrays.asList("p_a", "p_b", "p_c")));
+ try {
+ // Create multiple partitions with values: [a0, b0, c0], [a0, b1, c1],
[a0, b2, c2]...
+ List<List<String>> values = IntStream.range(0, count)
+ .mapToObj(i -> Arrays.asList("a0", "b" + i, "c" + i))
+ .collect(Collectors.toList());
+ addManyPartitionsNoException(client, dbName, tableName, null, values);
+ return bench.measure(
+ () ->
+ throwingSupplierWrapper(() ->
+ client.getPartitionsByPs(dbName, tableName,
Arrays.asList("a0")))
+ );
+ } finally {
+ throwingSupplierWrapper(() -> client.dropTable(dbName, tableName));
+ }
+ }
+
+
static DescriptiveStatistics benchmarkGetPartitionsStat(@NotNull
MicroBenchmark bench,
@NotNull BenchData
data,
int count) {
diff --git
a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
index 8b94c423ce2..c6c0ba44a7c 100644
---
a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
+++
b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
@@ -341,6 +341,11 @@ final class HMSClient implements AutoCloseable {
return client.get_partitions_by_filter(dbName, tableName, filter, (short)
-1);
}
+ List<Partition> getPartitionsByPs(@NotNull String dbName, @NotNull String
tableName,
+ @NotNull List<String> partVals) throws
TException {
+ return client.get_partitions_ps_with_auth(dbName, tableName, partVals,
(short) -1, null, null);
+ }
+
PartitionsStatsResult getPartitionsStats(PartitionsStatsRequest request)
throws TException {
return client.get_partitions_statistics_req(request);
}
diff --git
a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java
b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java
index 264bfae16b1..b94bd80e7e4 100644
---
a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java
+++
b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java
@@ -595,6 +595,26 @@ public final class Util {
.collect(Collectors.toList());
}
+ /**
+ * Create multiple partition objects with list of partition values.
+ *
+ * @param table
+ * @param parameters
+ * @param values list of partition values
+ * @return list of created partitions
+ */
+ static List<Partition> createManyPartitions(@NotNull Table table,
+ @Nullable Map<String, String>
parameters,
+ @NotNull List<List<String>>
values) {
+ return values.stream()
+ .map(vals ->
+ new PartitionBuilder(table)
+ .withParameters(parameters)
+ .withValues(vals)
+ .build())
+ .collect(Collectors.toList());
+ }
+
/**
* Add many partitions in one HMS call
*
@@ -648,6 +668,18 @@ public final class Util {
addManyPartitions(client, dbName, tableName, parameters,
arguments, npartitions));
}
+ static void addManyPartitionsNoException(@NotNull HMSClient client,
+ @NotNull String dbName,
+ @NotNull String tableName,
+ @Nullable Map<String, String>
parameters,
+ List<List<String>> values) {
+ throwingSupplierWrapper(() -> {
+ Table table = client.getTable(dbName, tableName);
+ client.addPartitions(createManyPartitions(table, parameters, values));
+ return null;
+ });
+ }
+
static void updateManyPartitionsStatsNoException(@NotNull HMSClient client,
@NotNull String dbName,
@NotNull String tableName,