This is an automated email from the ASF dual-hosted git repository.
ngangam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 63c6d8b HIVE-25800: Improvement in loadDynamicPartitions() to not
load all partitions from HMS for managed table (Sourabh Goyal, reviewed by
Laszlo Pinter)
63c6d8b is described below
commit 63c6d8ba70dfa59e791e1e49f44629c22fb41b7f
Author: Sourabh Goyal <[email protected]>
AuthorDate: Mon Dec 13 09:32:28 2021 -0800
HIVE-25800: Improvement in loadDynamicPartitions() to not load all
partitions from HMS for managed table (Sourabh Goyal, reviewed by Laszlo Pinter)
HIVE-20661 added an improvement in loadDynamicPartitions() api in Hive.java
to not add partitions one
by one in HMS. It used to fetch all the existing partitions for a table
from HMS and compare that with
dynamic partitions list to decide old and new partitions to be added to HMS
(in batches). The call to
fetch all partitions has introduced a performance regression for tables
with large number of
partitions (of the order of 100K).
This is fixed for external tables in HIVE-25178. However for ACID tables
there is an open Jira HIVE-25187.
Until we have an appropriate fix in HIVE-25187,we can skip fetching all
partitions. Instead, in the
threadPool which loads each partition individually,call getPartition() to
check if the partition already
exists in HMS or not. This will introduce additional getPartition() call
for every partition to be loaded
dynamically but does not fetch all existing partitions for a table anymore.
Change-Id: I1308d51d56d77aae2c8378e153d002b1d13f7cc1
---
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 39 ++++++++++------------
1 file changed, 17 insertions(+), 22 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4457075..691d63e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3039,11 +3039,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
final SessionState parentSession = SessionState.get();
List<Callable<Partition>> tasks = Lists.newLinkedList();
+ boolean fetchPartitionInfo = true;
final boolean scanPartitionsByName = conf.getBoolean(
ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_SCAN_SPECIFIC_PARTITIONS.varname, false);
+ // ACID table can be a bigger change. Filed HIVE-25817 for an appropriate
fix for ACID tables
+ // For now, for ACID tables, skip getting all partitions for a table from
HMS (since that
+ // can degrade performance for large partitioned tables) and instead make
getPartition() call
+ // for every dynamic partition
if (scanPartitionsByName && !tbd.isDirectInsert() &&
!AcidUtils.isTransactionalTable(tbl)) {
- //TODO: Need to create separate ticket for ACID table; ACID table can be
a bigger change.
//Fetch only relevant partitions from HMS for checking old partitions
List<String> partitionNames = new LinkedList<>();
for(PartitionDetails details : partitionDetailsMap.values()) {
@@ -3061,38 +3065,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
entry.getValue().hasOldPartition = true;
});
}
- } else {
-
- // fetch all the partitions matching the part spec using the partition
iterable
- // this way the maximum batch size configuration parameter is considered
- PartitionIterable partitionIterable = new PartitionIterable(Hive.get(),
tbl, partSpec,
- conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(),
300));
- Iterator<Partition> iterator = partitionIterable.iterator();
-
- // Match valid partition path to partitions
- while (iterator.hasNext()) {
- Partition partition = iterator.next();
- partitionDetailsMap.entrySet().parallelStream()
- .filter(entry ->
entry.getValue().fullSpec.equals(partition.getSpec()))
- .findAny().ifPresent(entry -> {
- entry.getValue().partition = partition;
- entry.getValue().hasOldPartition = true;
- });
- }
+ // no need to fetch partition again in tasks since we have already
fetched partitions
+ // info in getPartitionsByNames()
+ fetchPartitionInfo = false;
}
boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl,
writeId) : null;
for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet())
{
+ boolean getPartitionFromHms = fetchPartitionInfo;
tasks.add(() -> {
PartitionDetails partitionDetails = entry.getValue();
Map<String, String> fullPartSpec = partitionDetails.fullSpec;
try {
-
SessionState.setCurrentSessionState(parentSession);
+ if (getPartitionFromHms) {
+ // didn't fetch partition info from HMS. Getting from HMS now.
+ Partition existing = getPartition(tbl, fullPartSpec, false);
+ if (existing != null) {
+ partitionDetails.partition = existing;
+ partitionDetails.hasOldPartition = true;
+ }
+ }
LOG.info("New loading path = " + entry.getKey() + " withPartSpec " +
fullPartSpec);
-
Partition oldPartition = partitionDetails.partition;
List<FileStatus> newFiles = null;
if (partitionDetails.newFiles != null) {
@@ -3108,7 +3104,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
resetStatistics, writeId, stmtId, tbd.isInsertOverwrite(),
isTxnTable, newFiles, tbd.isDirectInsert());
// if the partition already existed before the loading, no need to
add it again to the
// metastore
-
if (tableSnapshot != null) {
partition.getTPartition().setWriteId(tableSnapshot.getWriteId());
}