JingsongLi commented on code in PR #6835:
URL: https://github.com/apache/paimon/pull/6835#discussion_r2645759895


##########
paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java:
##########
@@ -123,196 +128,260 @@ public IncrementalClusterManager(
                         specifiedPartitions);
     }
 
-    public Map<BinaryRow, CompactUnit> createCompactUnits(boolean 
fullCompaction) {
-        // 1. construct LSM structure for each partition
-        Map<BinaryRow, List<LevelSortedRun>> partitionLevels = 
constructLevels();
-        logForPartitionLevel(partitionLevels, partitionComputer);
+    public Map<BinaryRow, Map<Integer, CompactUnit>> 
createCompactUnits(boolean fullCompaction) {
+        // 1. construct LSM structure for each bucket
+        Map<BinaryRow, Map<Integer, List<LevelSortedRun>>> partitionLevels =
+                constructPartitionLevels();
+        if (LOG.isDebugEnabled()) {
+            logForLevels(partitionLevels, partitionComputer);
+        }
 
-        // 2. pick files to be clustered for each partition
-        Map<BinaryRow, CompactUnit> units = new HashMap<>();
+        // 2. pick files to be clustered for each bucket
+        Map<BinaryRow, Map<Integer, CompactUnit>> units = new HashMap<>();
         partitionLevels.forEach(
-                (k, v) -> {
-                    Optional<CompactUnit> pick =
-                            incrementalClusterStrategy.pick(numLevels, v, 
fullCompaction);
-                    pick.ifPresent(compactUnit -> units.put(k, compactUnit));
+                (partition, bucketLevels) -> {
+                    Map<Integer, CompactUnit> bucketUnits = new HashMap<>();
+                    bucketLevels.forEach(
+                            (bucket, levels) -> {
+                                Optional<CompactUnit> pick =
+                                        incrementalClusterStrategy.pick(
+                                                numLevels, levels, 
fullCompaction);
+                                pick.ifPresent(
+                                        compactUnit -> {
+                                            bucketUnits.put(bucket, 
compactUnit);
+                                            if (LOG.isDebugEnabled()) {
+                                                logForCompactUnits(
+                                                        
partitionComputer.generatePartValues(
+                                                                partition),
+                                                        bucket,
+                                                        compactUnit);
+                                            }
+                                        });
+                            });
+                    units.put(partition, bucketUnits);
                 });
 
         if (historyPartitionCluster != null) {
-            units.putAll(historyPartitionCluster.pickForHistoryPartitions());
+            units.putAll(historyPartitionCluster.createHistoryCompactUnits());
         }
 
-        if (LOG.isDebugEnabled()) {
-            units.forEach(
-                    (partition, compactUnit) -> {
-                        String filesInfo =
-                                compactUnit.files().stream()
-                                        .map(
-                                                file ->
-                                                        String.format(
-                                                                "%s,%s,%s",
-                                                                
file.fileName(),
-                                                                file.level(),
-                                                                
file.fileSize()))
-                                        .collect(Collectors.joining(", "));
-                        LOG.debug(
-                                "Partition {}, outputLevel:{}, clustered with 
{} files: [{}]",
-                                
partitionComputer.generatePartValues(partition),
-                                compactUnit.outputLevel(),
-                                compactUnit.files().size(),
-                                filesInfo);
-                    });
-        }
         return units;
     }
 
-    public Map<BinaryRow, List<LevelSortedRun>> constructLevels() {
+    public Map<BinaryRow, Map<Integer, List<LevelSortedRun>>> 
constructPartitionLevels() {
         List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
-        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
-        for (DataSplit dataSplit : dataSplits) {
-            partitionFiles
-                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
-                    .addAll(dataSplit.dataFiles());
-        }
-        return partitionFiles.entrySet().stream()
+        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> partitionLevels =
+                groupByPtAndBucket(dataSplits);
+
+        return partitionLevels.entrySet().stream()
                 .collect(
                         Collectors.toMap(
                                 Map.Entry::getKey,
-                                entry -> 
constructPartitionLevels(entry.getValue())));
+                                entry -> 
constructBucketLevels(entry.getValue())));
     }
 
-    public static List<LevelSortedRun> 
constructPartitionLevels(List<DataFileMeta> partitionFiles) {
-        List<LevelSortedRun> partitionLevels = new ArrayList<>();
+    public static Map<Integer, List<LevelSortedRun>> constructBucketLevels(
+            Map<Integer, List<DataFileMeta>> bucketLevels) {
+        return bucketLevels.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey, entry -> 
constructLevels(entry.getValue())));
+    }
+
+    public static List<LevelSortedRun> constructLevels(List<DataFileMeta> 
files) {
+        List<LevelSortedRun> levels = new ArrayList<>();
         Map<Integer, List<DataFileMeta>> levelMap =
-                
partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level));
+                
files.stream().collect(Collectors.groupingBy(DataFileMeta::level));
 
         for (Map.Entry<Integer, List<DataFileMeta>> entry : 
levelMap.entrySet()) {
             int level = entry.getKey();
             if (level == 0) {
                 for (DataFileMeta level0File : entry.getValue()) {
-                    partitionLevels.add(
-                            new LevelSortedRun(level, 
SortedRun.fromSingle(level0File)));
+                    levels.add(new LevelSortedRun(level, 
SortedRun.fromSingle(level0File)));
                 }
             } else {
                 // don't need to guarantee that the files within the same 
sorted run are
                 // non-overlapping here, so we call SortedRun.fromSorted() to 
avoid sorting and
                 // validation
-                partitionLevels.add(
-                        new LevelSortedRun(level, 
SortedRun.fromSorted(entry.getValue())));
+                levels.add(new LevelSortedRun(level, 
SortedRun.fromSorted(entry.getValue())));
             }
         }
 
         // sort by level
-        partitionLevels.sort(Comparator.comparing(LevelSortedRun::level));
+        levels.sort(Comparator.comparing(LevelSortedRun::level));
+        return levels;
+    }
+
+    public static Map<BinaryRow, Map<Integer, List<DataFileMeta>>> 
groupByPtAndBucket(
+            List<DataSplit> dataSplits) {
+        Map<BinaryRow, Map<Integer, List<DataFileMeta>>> partitionLevels = new 
HashMap<>();
+        for (DataSplit dataSplit : dataSplits) {
+            BinaryRow partition = dataSplit.partition();
+            Map<Integer, List<DataFileMeta>> bucketLevels = 
partitionLevels.get(partition);
+            if (bucketLevels == null) {
+                bucketLevels = new HashMap<>();
+                partitionLevels.put(partition.copy(), bucketLevels);
+            }
+            bucketLevels
+                    .computeIfAbsent(dataSplit.bucket(), k -> new 
ArrayList<>())
+                    .addAll(dataSplit.dataFiles());
+        }
         return partitionLevels;
     }
 
-    public Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> 
toSplitsAndRewriteDvFiles(
-            Map<BinaryRow, CompactUnit> compactUnits) {
-        Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> result = new 
HashMap<>();
+    public Map<BinaryRow, Map<Integer, Pair<List<DataSplit>, CommitMessage>>>
+            toSplitsAndRewriteDvFiles(
+                    Map<BinaryRow, Map<Integer, CompactUnit>> compactUnits, 
BucketMode bucketMode) {
+        Map<BinaryRow, Map<Integer, Pair<List<DataSplit>, CommitMessage>>> 
result = new HashMap<>();
         boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
-        for (BinaryRow partition : compactUnits.keySet()) {
-            CompactUnit unit = compactUnits.get(partition);
-            AppendDeleteFileMaintainer dvMaintainer =
-                    dvEnabled
-                            ? BaseAppendDeleteFileMaintainer.forUnawareAppend(
-                                    table.store().newIndexFileHandler(), 
snapshot, partition)
-                            : null;
-            List<DataSplit> splits = new ArrayList<>();
-
-            DataSplit.Builder builder =
-                    DataSplit.builder()
-                            .withPartition(partition)
-                            .withBucket(0)
-                            .withTotalBuckets(1)
-                            .isStreaming(false);
-
-            SplitGenerator splitGenerator = snapshotReader.splitGenerator();
-            List<SplitGenerator.SplitGroup> splitGroups =
-                    splitGenerator.splitForBatch(unit.files());
-
-            for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
-                List<DataFileMeta> dataFiles = splitGroup.files;
-
-                String bucketPath =
-                        snapshotReader.pathFactory().bucketPath(partition, 
0).toString();
-                builder.withDataFiles(dataFiles)
-                        .rawConvertible(splitGroup.rawConvertible)
-                        .withBucketPath(bucketPath);
+        if (bucketMode == BucketMode.HASH_FIXED) {
+            checkArgument(
+                    !dvEnabled,
+                    "Clustering is not supported for fixed-bucket table 
enabled deletion-vector currently.");
+        }
 
-                if (dvMaintainer != null) {
-                    List<DeletionFile> dataDeletionFiles = new ArrayList<>();
-                    for (DataFileMeta file : dataFiles) {
-                        DeletionFile deletionFile =
-                                
dvMaintainer.notifyRemovedDeletionVector(file.fileName());
-                        dataDeletionFiles.add(deletionFile);
+        for (BinaryRow partition : compactUnits.keySet()) {
+            Map<Integer, CompactUnit> bucketUnits = 
compactUnits.get(partition);
+            Map<Integer, Pair<List<DataSplit>, CommitMessage>> bucketSplits = 
new HashMap<>();
+            for (Integer bucket : bucketUnits.keySet()) {
+                CompactUnit unit = bucketUnits.get(bucket);
+                BaseAppendDeleteFileMaintainer dvMaintainer =
+                        dvEnabled ? getDvMaintainer(bucketMode, partition, 
bucket) : null;
+                List<DataSplit> splits = new ArrayList<>();
+
+                DataSplit.Builder builder =
+                        DataSplit.builder()
+                                .withPartition(partition)
+                                .withBucket(bucket)
+                                .withTotalBuckets(table.coreOptions().bucket())
+                                .isStreaming(false);
+
+                SplitGenerator splitGenerator = 
snapshotReader.splitGenerator();
+                List<SplitGenerator.SplitGroup> splitGroups =
+                        splitGenerator.splitForBatch(unit.files());
+
+                for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
+                    List<DataFileMeta> dataFiles = splitGroup.files;
+
+                    String bucketPath =
+                            snapshotReader.pathFactory().bucketPath(partition, 
0).toString();
+                    builder.withDataFiles(dataFiles)
+                            .rawConvertible(splitGroup.rawConvertible)
+                            .withBucketPath(bucketPath);
+
+                    if (dvMaintainer != null) {
+                        List<DeletionFile> dataDeletionFiles = new 
ArrayList<>();
+                        for (DataFileMeta file : dataFiles) {
+                            DeletionFile deletionFile =
+                                    ((AppendDeleteFileMaintainer) dvMaintainer)
+                                            
.notifyRemovedDeletionVector(file.fileName());
+                            dataDeletionFiles.add(deletionFile);
+                        }
+                        builder.withDataDeletionFiles(dataDeletionFiles);
                     }
-                    builder.withDataDeletionFiles(dataDeletionFiles);
+                    splits.add(builder.build());
                 }
-                splits.add(builder.build());
-            }
 
-            // generate delete dv index meta
-            CommitMessage dvCommitMessage = null;
-            if (dvMaintainer != null) {
-                List<IndexFileMeta> newIndexFiles = new ArrayList<>();
-                List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
-                List<IndexManifestEntry> indexEntries = dvMaintainer.persist();
-                for (IndexManifestEntry entry : indexEntries) {
-                    if (entry.kind() == FileKind.ADD) {
-                        newIndexFiles.add(entry.indexFile());
-                    } else {
-                        deletedIndexFiles.add(entry.indexFile());
+                // generate delete dv index meta
+                CommitMessage dvCommitMessage = null;
+                if (dvMaintainer != null) {
+                    List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+                    List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
+                    List<IndexManifestEntry> indexEntries = 
dvMaintainer.persist();
+                    for (IndexManifestEntry entry : indexEntries) {
+                        if (entry.kind() == FileKind.ADD) {
+                            newIndexFiles.add(entry.indexFile());
+                        } else {
+                            deletedIndexFiles.add(entry.indexFile());
+                        }
                     }
+                    dvCommitMessage =
+                            new CommitMessageImpl(
+                                    dvMaintainer.getPartition(),
+                                    bucket,
+                                    table.coreOptions().bucket(),
+                                    DataIncrement.emptyIncrement(),
+                                    new CompactIncrement(
+                                            Collections.emptyList(),
+                                            Collections.emptyList(),
+                                            Collections.emptyList(),
+                                            newIndexFiles,
+                                            deletedIndexFiles));
                 }
-                dvCommitMessage =
-                        new CommitMessageImpl(
-                                dvMaintainer.getPartition(),
-                                0,
-                                table.coreOptions().bucket(),
-                                DataIncrement.emptyIncrement(),
-                                new CompactIncrement(
-                                        Collections.emptyList(),
-                                        Collections.emptyList(),
-                                        Collections.emptyList(),
-                                        newIndexFiles,
-                                        deletedIndexFiles));
+
+                bucketSplits.put(bucket, Pair.of(splits, dvCommitMessage));
             }
 
-            result.put(partition, Pair.of(splits, dvCommitMessage));
+            result.put(partition, bucketSplits);
         }
 
         return result;
     }
 
+    public BaseAppendDeleteFileMaintainer getDvMaintainer(
+            BucketMode bucketMode, BinaryRow partition, int bucket) {
+        switch (bucketMode) {
+            case HASH_FIXED:
+                // TODO: support dv for hash fixed bucket table
+                return null;
+            case BUCKET_UNAWARE:
+                return BaseAppendDeleteFileMaintainer.forUnawareAppend(
+                        table.store().newIndexFileHandler(), snapshot, 
partition);
+            default:
+                throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
+        }
+    }
+
     public static List<DataFileMeta> upgrade(
             List<DataFileMeta> filesAfterCluster, int outputLevel) {
         return filesAfterCluster.stream()
                 .map(file -> file.upgrade(outputLevel))
                 .collect(Collectors.toList());
     }
 
-    public static void logForPartitionLevel(
-            Map<BinaryRow, List<LevelSortedRun>> partitionLevels,
+    public static void logForLevels(
+            Map<BinaryRow, Map<Integer, List<LevelSortedRun>>> partitionLevels,
             InternalRowPartitionComputer partitionComputer) {
-        if (LOG.isDebugEnabled()) {
-            partitionLevels.forEach(
-                    (partition, levelSortedRuns) -> {
-                        String runsInfo =
-                                levelSortedRuns.stream()
-                                        .map(
-                                                lsr ->
-                                                        String.format(
-                                                                "level-%s:%s",
-                                                                lsr.level(),
-                                                                
lsr.run().files().size()))
-                                        .collect(Collectors.joining(","));
-                        LOG.debug(
-                                "Partition {} has {} runs: [{}]",
-                                
partitionComputer.generatePartValues(partition),
-                                levelSortedRuns.size(),
-                                runsInfo);
-                    });
-        }
+        partitionLevels.forEach(

Review Comment:
   Too deep. Use for loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to