InvisibleProgrammer commented on code in PR #3775:
URL: https://github.com/apache/hive/pull/3775#discussion_r1063420879


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+        HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+        AcidUtils.isFullAcidTable(tblproperties)) {
+      long totalSize = baseSize + deltaSize;
+      long minimumSize = MetastoreConf.getLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+      if (totalSize > minimumSize) {
+        try {
+          Map<Integer, Long> bucketSizes = new HashMap<>();
+          //compute the size of each bucket
+          dir.getFiles().stream()
+              .filter(f -> 
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+              .forEach(
+                  f -> bucketSizes.merge(
+                      
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+                      f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+                      Long::sum));
+          final double mean = (double) totalSize / bucketSizes.size();
+
+          // calculate the standard deviation
+          double standardDeviation = Math.sqrt(
+              bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+                  .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - 
mean, 2)) / bucketSizes.size()));
+
+          double rsdThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+          //Relative standard deviation: If the standard deviation is larger 
than rsdThreshold * average_bucket_size,
+          // a rebalancing compaction is initiated.
+          if (standardDeviation > mean * rsdThreshold) {
+            LOG.debug("");

Review Comment:
   Is that a leftover?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&

Review Comment:
   Major->Rebalance->Minor. As I see, after the change, that is the priority 
order of the different type of compactions. Is that the right order? Does the 
order matter at all? 



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&
+        HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) &&
+        AcidUtils.isFullAcidTable(tblproperties)) {
+      long totalSize = baseSize + deltaSize;
+      long minimumSize = MetastoreConf.getLongVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE);
+      if (totalSize > minimumSize) {
+        try {
+          Map<Integer, Long> bucketSizes = new HashMap<>();
+          //compute the size of each bucket
+          dir.getFiles().stream()
+              .filter(f -> 
AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath()))
+              .forEach(
+                  f -> bucketSizes.merge(
+                      
AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()),
+                      f.getHdfsFileStatusWithId().getFileStatus().getLen(),
+                      Long::sum));
+          final double mean = (double) totalSize / bucketSizes.size();
+
+          // calculate the standard deviation
+          double standardDeviation = Math.sqrt(
+              bucketSizes.values().stream().mapToDouble(Long::doubleValue)
+                  .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - 
mean, 2)) / bucketSizes.size()));
+
+          double rsdThreshold = MetastoreConf.getDoubleVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD);
+          //Relative standard deviation: If the standard deviation is larger 
than rsdThreshold * average_bucket_size,
+          // a rebalancing compaction is initiated.
+          if (standardDeviation > mean * rsdThreshold) {
+            LOG.debug("");

Review Comment:
   Should it contain a log information that a rebalancing compaction is 
initiated?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java:
##########
@@ -504,6 +504,47 @@ private CompactionType 
determineCompactionType(CompactionInfo ci, AcidDirectory
       if (initiateMajor) return CompactionType.MAJOR;
     }
 
+    // bucket size calculation can be resource intensive if there are numerous 
deltas, so we check for rebalance
+    // compaction only if the table is in an acceptable shape: no major 
compaction required. This means the number of
+    // files shouldn't be too high
+    if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) &&

Review Comment:
   Could you please extract it into a method to improve readability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to