InvisibleProgrammer commented on code in PR #3775:
URL: https://github.com/apache/hive/pull/3775#discussion_r1063420879
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+ HiveConf.getBoolVar(conf,
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+ AcidUtils.isFullAcidTable(tblproperties)) {
+ long totalSize = baseSize + deltaSize;
+ long minimumSize = MetastoreConf.getLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+ if (totalSize > minimumSize) {
+ try {
+ Map<Integer, Long> bucketSizes = new HashMap<>();
+ //compute the size of each bucket
+ dir.getFiles().stream()
+ .filter(f ->
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+ .forEach(
+ f -> bucketSizes.merge(
+
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+ f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+ Long::sum));
+ final double mean = (double) totalSize / bucketSizes.size();
+
+ // calculate the standard deviation
+ double standardDeviation = Math.sqrt(
+ bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+ .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num -
mean, 2)) / bucketSizes.size()));
+
+ double rsdThreshold = MetastoreConf.getDoubleVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+ //Relative standard deviation: If the standard deviation is larger
than rsdThreshold * average_bucket_size,
+ // a rebalancing compaction is initiated.
+ if (standardDeviation > mean * rsdThreshold) {
+ LOG.debug("");
Review Comment:
Is that a leftover?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
Review Comment:
Major->Rebalance->Minor. As I see, after the change, that is the priority
order of the different type of compactions. Is that the right order? Does the
order matter at all?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+ HiveConf.getBoolVar(conf,
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+ AcidUtils.isFullAcidTable(tblproperties)) {
+ long totalSize = baseSize + deltaSize;
+ long minimumSize = MetastoreConf.getLongVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+ if (totalSize > minimumSize) {
+ try {
+ Map<Integer, Long> bucketSizes = new HashMap<>();
+ //compute the size of each bucket
+ dir.getFiles().stream()
+ .filter(f ->
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+ .forEach(
+ f -> bucketSizes.merge(
+
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+ f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+ Long::sum));
+ final double mean = (double) totalSize / bucketSizes.size();
+
+ // calculate the standard deviation
+ double standardDeviation = Math.sqrt(
+ bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+ .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num -
mean, 2)) / bucketSizes.size()));
+
+ double rsdThreshold = MetastoreConf.getDoubleVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+ //Relative standard deviation: If the standard deviation is larger
than rsdThreshold * average_bucket_size,
+ // a rebalancing compaction is initiated.
+ if (standardDeviation > mean * rsdThreshold) {
+ LOG.debug("");
Review Comment:
Should it contain a log information that a rebalancing compaction is
initiated?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType
determineCompactionType(CompactionInfo ci, AcidDirectory
if (initiateMajor) return CompactionType.MAJOR;
}
+ // bucket size calculation can be resource intensive if there are numerous
deltas, so we check for rebalance
+ // compaction only if the table is in an acceptable shape: no major
compaction required. This means the number of
+ // files shouldn't be too high
+ if ("tez".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
Review Comment:
Could you please extract it into a method to improve readability?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]