SourabhBadhya commented on code in PR #3801:
URL: https://github.com/apache/hive/pull/3801#discussion_r1064288971


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -281,5 +283,146 @@ static void overrideConfProps(HiveConf conf, 
CompactionInfo ci, Map<String, Stri
                 conf.set(property, entry.getValue());
               });
     }
+
+    /**
+     * Returns whether merge compaction must be enabled or not.
+     * @param conf Hive configuration
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @param storageDescriptor storage descriptor of the underlying table
+     * @return true, if merge compaction must be enabled
+     */
+    static boolean isMergeCompaction(HiveConf conf, AcidDirectory directory,
+                                     ValidWriteIdList validWriteIdList,
+                                     StorageDescriptor storageDescriptor) {
+      return conf.getBoolVar(HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED)
+              && !hasDeleteOrAbortedDirectories(directory, validWriteIdList)
+              && 
storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
+    }
+
+    /**
+     * Scan a directory for delete deltas or aborted directories.
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @return true, if delete or aborted directory found
+     */
+    static boolean hasDeleteOrAbortedDirectories(AcidDirectory directory, 
ValidWriteIdList validWriteIdList) {
+      if (!directory.getCurrentDirectories().isEmpty()) {
+        final long minWriteId = validWriteIdList.getMinOpenWriteId() == null ? 
1 : validWriteIdList.getMinOpenWriteId();
+        final long maxWriteId = validWriteIdList.getHighWatermark();
+        return directory.getCurrentDirectories().stream()
+                .filter(AcidUtils.ParsedDeltaLight::isDeleteDelta)
+                .filter(delta -> delta.getMinWriteId() >= minWriteId)
+                .anyMatch(delta -> delta.getMaxWriteId() <= maxWriteId) || 
!directory.getAbortedDirectories().isEmpty();
+      }
+      return true;
+    }
+
+    /**
+     * Collect the list of all bucket file paths, which belong to the same 
bucket Id. This method scans all the base
+     * and delta dirs.
+     * @param conf hive configuration, must be not null
+     * @param dir the root directory of delta dirs
+     * @param includeBaseDir true, if the base directory should be scanned
+     * @param isMm
+     * @return map of bucket ID -> bucket files
+     * @throws IOException an error happened during the reading of the 
directory/bucket file
+     */
+    private static Map<Integer, List<Reader>> 
matchBucketIdToBucketFiles(HiveConf conf, AcidDirectory dir,
+                                                                       boolean 
includeBaseDir, boolean isMm) throws IOException {
+      Map<Integer, List<Reader>> result = new HashMap<>();
+      if (includeBaseDir && dir.getBaseDirectory() != null) {
+        getBucketFiles(conf, dir.getBaseDirectory(), isMm, result);
+      }
+      for (AcidUtils.ParsedDelta deltaDir : dir.getCurrentDirectories()) {
+        Path deltaDirPath = deltaDir.getPath();
+        getBucketFiles(conf, deltaDirPath, isMm, result);
+      }
+      return result;
+    }
+
+    /**
+     * Collect the list of all bucket file paths, which belong to the same 
bucket Id. This method checks only one
+     * directory.
+     * @param conf hive configuration, must be not null
+     * @param dirPath the directory to be scanned.
+     * @param isMm collect bucket files fron insert only directories
+     * @param bucketIdToBucketFilePath the result of the scan
+     * @throws IOException an error happened during the reading of the 
directory/bucket file
+     */
+    private static void getBucketFiles(HiveConf conf, Path dirPath, boolean 
isMm, Map<Integer, List<Reader>> bucketIdToBucketFilePath) throws IOException {
+      FileSystem fs = dirPath.getFileSystem(conf);
+      FileStatus[] fileStatuses =
+              fs.listStatus(dirPath, isMm ? AcidUtils.originalBucketFilter : 
AcidUtils.bucketFileFilter);
+      for (FileStatus f : fileStatuses) {
+        final Path fPath = f.getPath();
+        Matcher matcher = isMm ? AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN
+                .matcher(fPath.getName()) : 
AcidUtils.BUCKET_PATTERN.matcher(fPath.getName());
+        if (!matcher.find()) {
+          String errorMessage = String
+                  .format("Found a bucket file matching the bucket pattern! %s 
Matcher=%s", fPath.toString(),
+                          matcher.toString());
+          LOG.error(errorMessage);
+          throw new IllegalArgumentException(errorMessage);
+        }
+        int bucketNum = matcher.groupCount() > 0 ? 
Integer.parseInt(matcher.group(1)) : Integer.parseInt(matcher.group());
+        bucketIdToBucketFilePath.computeIfAbsent(bucketNum, ArrayList::new);
+        Reader reader = OrcFile.createReader(fs, fPath);
+        bucketIdToBucketFilePath.computeIfPresent(bucketNum, (k, v) -> 
v).add(reader);
+      }
+    }
+
+    /**
+     * Generate output path for compaction. This can be used to generate delta 
or base directories.
+     * @param conf hive configuration, must be non-null
+     * @param writeIds list of valid write IDs
+     * @param isBaseDir if base directory path should be generated
+     * @param sd the resolved storadge descriptor
+     * @return output path, always non-null
+     */
+    static Path getCompactionOutputDirPath(HiveConf conf, ValidWriteIdList 
writeIds, boolean isBaseDir,
+                                           StorageDescriptor sd) {
+      long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : 
writeIds.getMinOpenWriteId();
+      long highWatermark = writeIds.getHighWatermark();
+      long compactorTxnId = Compactor.getCompactorTxnId(conf);
+      AcidOutputFormat.Options options = new 
AcidOutputFormat.Options(conf).writingBase(isBaseDir)
+              
.writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
+              
.maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
+      return AcidUtils.baseOrDeltaSubdirPath(new Path(sd.getLocation()), 
options);
+    }
+
+    /**
+     * Merge ORC files from base/delta directories. If the directories 
contains multiple buckets, the result will also
+     * contain the same amount.
+     * @param conf hive configuration
+     * @param includeBaseDir if base directory should be scanned for orc files
+     * @param dir the root directory of the table/partition
+     * @param outputDirPath the result directory path
+     * @param isMm merge orc files from insert only tables
+     * @throws IOException error occurred during file operation
+     */
+    static boolean mergeOrcFiles(HiveConf conf, boolean includeBaseDir, 
AcidDirectory dir,
+                              Path outputDirPath, boolean isMm) throws 
IOException {
+      Map<Integer, List<Reader>> bucketIdToBucketFiles = 
matchBucketIdToBucketFiles(conf, dir, includeBaseDir, isMm);
+      OrcFileMerger fileMerger = new OrcFileMerger(conf);
+      for (Map.Entry<Integer, List<Reader>> e : 
bucketIdToBucketFiles.entrySet()) {
+        fileMerger.checkCompatibility(e.getValue());
+      }
+      boolean isCompatible = true;
+      for (Map.Entry<Integer, List<Reader>> e : 
bucketIdToBucketFiles.entrySet()) {
+        isCompatible &= fileMerger.checkCompatibility(e.getValue());
+      }

Review Comment:
   I did this by mistake. Removed one of the iterations. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to