[
https://issues.apache.org/jira/browse/HIVE-26718?focusedWorklogId=837476&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-837476
]
ASF GitHub Bot logged work on HIVE-26718:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Jan/23 13:21
Start Date: 06/Jan/23 13:21
Worklog Time Spent: 10m
Work Description: InvisibleProgrammer commented on code in PR #3775:
URL: https://github.com/apache/hive/pull/3775#discussion_r1063420879
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+ HiveConf.getBoolVar(conf,
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+ AcidUtils.isFullAcidTable(tblproperties)) {
+ long totalSize = baseSize + deltaSize;
+ long minimumSize = MetastoreConf.getLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+ if (totalSize > minimumSize) {
+ try {
+ Map<Integer, Long> bucketSizes = new HashMap<>();
+ //compute the size of each bucket
+ dir.getFiles().stream()
+ .filter(f ->
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+ .forEach(
+ f -> bucketSizes.merge(
+
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+ f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+ Long::sum));
+ final double mean = (double) totalSize / bucketSizes.size();
+
+ // calculate the standard deviation
+ double standardDeviation = Math.sqrt(
+ bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+ .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num -
mean, 2)) / bucketSizes.size()));
+
+ double rsdThreshold = MetastoreConf.getDoubleVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+ //Relative standard deviation: If the standard deviation is larger
than rsdThreshold * average_bucket_size,
+ // a rebalancing compaction is initiated.
+ if (standardDeviation > mean * rsdThreshold) {
+ LOG.debug("");
Review Comment:
Is that a leftover?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
Review Comment:
Major->Rebalance->Minor. As I see, after the change, that is the priority
order of the different type of compactions. Is that the right order? Does the
order matter at all?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+ HiveConf.getBoolVar(conf,
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+ AcidUtils.isFullAcidTable(tblproperties)) {
+ long totalSize = baseSize + deltaSize;
+ long minimumSize = MetastoreConf.getLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+ if (totalSize > minimumSize) {
+ try {
+ Map<Integer, Long> bucketSizes = new HashMap<>();
+ //compute the size of each bucket
+ dir.getFiles().stream()
+ .filter(f ->
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+ .forEach(
+ f -> bucketSizes.merge(
+
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+ f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+ Long::sum));
+ final double mean = (double) totalSize / bucketSizes.size();
+
+ // calculate the standard deviation
+ double standardDeviation = Math.sqrt(
+ bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+ .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num -
mean, 2)) / bucketSizes.size()));
+
+ double rsdThreshold = MetastoreConf.getDoubleVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+ //Relative standard deviation: If the standard deviation is larger
than rsdThreshold * average_bucket_size,
+ // a rebalancing compaction is initiated.
+ if (standardDeviation > mean * rsdThreshold) {
+ LOG.debug("");
Review Comment:
Should it contain a log information that a rebalancing compaction is
initiated?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
Review Comment:
Could you please extract it into a method to improve readability?
Issue Time Tracking
-------------------
Worklog Id: (was: 837476)
Time Spent: 1h 10m (was: 1h)
> Enable initiator to schedule rebalancing compactions
> ----------------------------------------------------
>
> Key: HIVE-26718
> URL: https://issues.apache.org/jira/browse/HIVE-26718
> Project: Hive
> Issue Type: Sub-task
> Components: Hive
> Reporter: László Végh
> Assignee: László Végh
> Priority: Major
> Labels: ACID, compaction, pull-request-available
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Initiator should be able to schedule rebalancing compactions based on a set
> of predefined and configurable thresholds.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)