This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 718f124d37189659dbebd3b7a3c4cd00d66341a3 Author: JingsongLi <[email protected]> AuthorDate: Tue Oct 28 21:48:59 2025 +0800 [core] Refactor HistoryPartitionCluster to load less history partitions --- .../append/cluster/HistoryPartitionCluster.java | 139 ++++++++++----------- .../append/cluster/IncrementalClusterManager.java | 91 +++++--------- .../paimon/operation/AbstractFileStoreScan.java | 4 +- .../org/apache/paimon/operation/FileStoreScan.java | 2 +- .../apache/paimon/operation/ManifestsReader.java | 8 +- .../table/source/snapshot/SnapshotReader.java | 2 +- .../table/source/snapshot/SnapshotReaderImpl.java | 5 +- .../apache/paimon/table/system/AuditLogTable.java | 5 +- .../cluster/HistoryPartitionClusterTest.java | 57 +-------- .../cluster/IncrementalClusterManagerTest.java | 2 +- 10 files changed, 110 insertions(+), 205 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java index cccecdd2be..83f92aa9c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HistoryPartitionCluster.java @@ -27,7 +27,6 @@ import org.apache.paimon.mergetree.LevelSortedRun; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.slf4j.Logger; @@ -39,14 +38,11 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.append.cluster.IncrementalClusterManager.constructPartitionLevels; @@ -56,70 +52,86 @@ import static org.apache.paimon.append.cluster.IncrementalClusterManager.logForP public class HistoryPartitionCluster { private static final Logger LOG = LoggerFactory.getLogger(HistoryPartitionCluster.class); - private final InternalRowPartitionComputer partitionComputer; private final FileStoreTable table; private final IncrementalClusterStrategy incrementalClusterStrategy; - private final int maxLevel; - + private final InternalRowPartitionComputer partitionComputer; + private final PartitionPredicate specifiedPartitions; + private final Duration historyPartitionIdleTime; private final int historyPartitionLimit; - @Nullable private final PartitionPredicate specifiedPartitions; - @Nullable private final Duration historyPartitionIdleTime; - @Nullable private final BiFilter<Integer, Integer> partitionLevelFilter; + private final int maxLevel; public HistoryPartitionCluster( FileStoreTable table, IncrementalClusterStrategy incrementalClusterStrategy, InternalRowPartitionComputer partitionComputer, - int maxLevel, - int historyPartitionLimit, - @Nullable PartitionPredicate specifiedPartitions, - @Nullable Duration historyPartitionIdleTime) { + PartitionPredicate specifiedPartitions, + Duration historyPartitionIdleTime, + int historyPartitionLimit) { this.table = table; this.incrementalClusterStrategy = incrementalClusterStrategy; this.partitionComputer = partitionComputer; - this.maxLevel = maxLevel; - this.historyPartitionLimit = historyPartitionLimit; this.specifiedPartitions = specifiedPartitions; this.historyPartitionIdleTime = historyPartitionIdleTime; - // (maxLevel + 1) / 2 is used to calculate the ceiling of maxLevel divided by 2 - this.partitionLevelFilter = - (partitionMinLevel, partitionMaxLevel) -> partitionMinLevel < (maxLevel + 1) / 2; + this.historyPartitionLimit = historyPartitionLimit; + this.maxLevel = table.coreOptions().numLevels() - 1; + } + + @Nullable + public static HistoryPartitionCluster create( + FileStoreTable table, + IncrementalClusterStrategy incrementalClusterStrategy, + InternalRowPartitionComputer partitionComputer, + @Nullable PartitionPredicate specifiedPartitions) { + if (table.schema().partitionKeys().isEmpty()) { + return null; + } + if (specifiedPartitions == null) { + return null; + } + + Duration idleTime = table.coreOptions().clusteringHistoryPartitionIdleTime(); + if (idleTime == null) { + return null; + } + + int limit = table.coreOptions().clusteringHistoryPartitionLimit(); + return new HistoryPartitionCluster( + table, + incrementalClusterStrategy, + partitionComputer, + specifiedPartitions, + idleTime, + limit); } - public Map<BinaryRow, Optional<CompactUnit>> pickForHistoryPartitions() { + public Map<BinaryRow, CompactUnit> pickForHistoryPartitions() { Map<BinaryRow, List<LevelSortedRun>> partitionLevels = constructLevelsForHistoryPartitions(); logForPartitionLevel(partitionLevels, partitionComputer); - return partitionLevels.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - incrementalClusterStrategy.pick( - maxLevel, entry.getValue(), true))); + Map<BinaryRow, CompactUnit> units = new HashMap<>(); + partitionLevels.forEach( + (k, v) -> { + Optional<CompactUnit> pick = + incrementalClusterStrategy.pick(maxLevel + 1, v, true); + pick.ifPresent(compactUnit -> units.put(k, compactUnit)); + }); + return units; } @VisibleForTesting public Map<BinaryRow, List<LevelSortedRun>> constructLevelsForHistoryPartitions() { - if (specifiedPartitions == null - || historyPartitionIdleTime == null - || historyPartitionLimit <= 0) { - return Collections.emptyMap(); - } - long historyMilli = LocalDateTime.now() .minus(historyPartitionIdleTime) .atZone(ZoneId.systemDefault()) .toInstant() .toEpochMilli(); - // read partitionEntries filter by partitionLevelFilter historyPartitionIdleTime - // sort partitionEntries by lastFileCreation time, and we will pick the oldest N partitions + List<BinaryRow> historyPartitions = - table.newSnapshotReader().withManifestLevelFilter(partitionLevelFilter) - .partitionEntries().stream() + table.newSnapshotReader().withLevelMinMaxFilter((min, max) -> min < maxLevel) + .withLevelFilter(level -> level < maxLevel).partitionEntries().stream() .filter(entry -> entry.lastFileCreationTime() < historyMilli) .sorted(Comparator.comparingLong(PartitionEntry::lastFileCreationTime)) .map(PartitionEntry::partition) @@ -131,6 +143,7 @@ public class HistoryPartitionCluster { .withPartitionFilter(historyPartitions) .read() .dataSplits(); + Map<BinaryRow, List<DataFileMeta>> historyPartitionFiles = new HashMap<>(); for (DataSplit dataSplit : historyDataSplits) { historyPartitionFiles @@ -138,52 +151,34 @@ public class HistoryPartitionCluster { .addAll(dataSplit.dataFiles()); } - // find history partitions which have low-level files - Set<BinaryRow> selectedHistoryPartitions = - findLowLevelPartitions(historyPartitions, historyPartitionFiles); - historyPartitionFiles = - historyPartitionFiles.entrySet().stream() - .filter(entry -> selectedHistoryPartitions.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - return historyPartitionFiles.entrySet().stream() + return filterPartitions(historyPartitionFiles).entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, entry -> constructPartitionLevels(entry.getValue()))); } - @VisibleForTesting - protected Set<BinaryRow> findLowLevelPartitions( - List<BinaryRow> historyPartitions, Map<BinaryRow, List<DataFileMeta>> partitionFiles) { - Set<BinaryRow> partitions = new HashSet<>(); - // 1. the partition is not specified in specifiedPartitions - // 2. the min file level in partition should be less than Math.ceil(maxLevel/2) - for (BinaryRow historyPartition : historyPartitions) { - if (specifiedPartitions != null && !specifiedPartitions.test(historyPartition)) { - List<DataFileMeta> files = - partitionFiles.getOrDefault(historyPartition, Collections.emptyList()); - if (!files.isEmpty()) { - int partitionMinLevel = maxLevel + 1; - for (DataFileMeta file : files) { - partitionMinLevel = Math.min(partitionMinLevel, file.level()); + private Map<BinaryRow, List<DataFileMeta>> filterPartitions( + Map<BinaryRow, List<DataFileMeta>> partitionFiles) { + Map<BinaryRow, List<DataFileMeta>> result = new HashMap<>(); + partitionFiles.forEach( + (part, files) -> { + if (specifiedPartitions.test(part)) { + // already contain in specified partitions + return; } - if (partitionLevelFilter != null - && partitionLevelFilter.test(partitionMinLevel, maxLevel)) { - partitions.add(historyPartition); - if (partitions.size() >= historyPartitionLimit) { - break; - } + + if (result.size() < historyPartitionLimit) { + // in limit, can be picked + result.put(part, files); } - } - } - } + }); LOG.info( "Find {} history partitions for full clustering, the history partitions are {}", - partitions.size(), - partitions.stream() + result.size(), + result.keySet().stream() .map(partitionComputer::generatePartValues) .collect(Collectors.toSet())); - return partitions; + return result; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java index 8142f9dbf0..6865b4b82e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java @@ -19,6 +19,7 @@ package org.apache.paimon.append.cluster; import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; @@ -52,17 +53,14 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; public class IncrementalClusterManager { private static final Logger LOG = LoggerFactory.getLogger(IncrementalClusterManager.class); - private final InternalRowPartitionComputer partitionComputer; + private final InternalRowPartitionComputer partitionComputer; private final SnapshotReader snapshotReader; - private final IncrementalClusterStrategy incrementalClusterStrategy; private final CoreOptions.OrderType clusterCurve; private final List<String> clusterKeys; - - private final HistoryPartitionCluster historyPartitionCluster; - - private int maxLevel; + private final int numLevels; + private final @Nullable HistoryPartitionCluster historyPartitionCluster; public IncrementalClusterManager(FileStoreTable table) { this(table, null); @@ -78,7 +76,7 @@ public class IncrementalClusterManager { options.clusteringIncrementalEnabled(), "Only support incremental clustering when '%s' is true.", CLUSTERING_INCREMENTAL.key()); - this.maxLevel = options.numLevels(); + this.numLevels = options.numLevels(); this.partitionComputer = new InternalRowPartitionComputer( table.coreOptions().partitionDefaultName(), @@ -96,16 +94,9 @@ public class IncrementalClusterManager { options.numSortedRunCompactionTrigger()); this.clusterCurve = options.clusteringStrategy(options.clusteringColumns().size()); this.clusterKeys = options.clusteringColumns(); - this.historyPartitionCluster = - new HistoryPartitionCluster( - table, - incrementalClusterStrategy, - partitionComputer, - maxLevel, - options.clusteringHistoryPartitionLimit(), - specifiedPartitions, - options.clusteringHistoryPartitionIdleTime()); + HistoryPartitionCluster.create( + table, incrementalClusterStrategy, partitionComputer, specifiedPartitions); } public Map<BinaryRow, CompactUnit> prepareForCluster(boolean fullCompaction) { @@ -114,30 +105,20 @@ public class IncrementalClusterManager { logForPartitionLevel(partitionLevels, partitionComputer); // 2. pick files to be clustered for each partition - Map<BinaryRow, Optional<CompactUnit>> units = - partitionLevels.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> - incrementalClusterStrategy.pick( - maxLevel, - entry.getValue(), - fullCompaction))); - - Map<BinaryRow, Optional<CompactUnit>> historyUnits = - historyPartitionCluster.pickForHistoryPartitions(); - units.putAll(historyUnits); + Map<BinaryRow, CompactUnit> units = new HashMap<>(); + partitionLevels.forEach( + (k, v) -> { + Optional<CompactUnit> pick = + incrementalClusterStrategy.pick(numLevels, v, fullCompaction); + pick.ifPresent(compactUnit -> units.put(k, compactUnit)); + }); + + if (historyPartitionCluster != null) { + units.putAll(historyPartitionCluster.pickForHistoryPartitions()); + } - // 3. filter out empty units - Map<BinaryRow, CompactUnit> filteredUnits = - units.entrySet().stream() - .filter(entry -> entry.getValue().isPresent()) - .collect( - Collectors.toMap( - Map.Entry::getKey, entry -> entry.getValue().get())); if (LOG.isDebugEnabled()) { - filteredUnits.forEach( + units.forEach( (partition, compactUnit) -> { String filesInfo = compactUnit.files().stream() @@ -157,24 +138,17 @@ public class IncrementalClusterManager { filesInfo); }); } - return filteredUnits; + return units; } public Map<BinaryRow, List<LevelSortedRun>> constructLevels() { List<DataSplit> dataSplits = snapshotReader.read().dataSplits(); - - maxLevel = - Math.max( - maxLevel, - dataSplits.stream() - .flatMap(split -> split.dataFiles().stream()) - .mapToInt(DataFileMeta::level) - .max() - .orElse(-1) - + 1); - checkArgument(maxLevel > 1, "Number of levels must be at least 2."); - - Map<BinaryRow, List<DataFileMeta>> partitionFiles = getPartitionFiles(dataSplits); + Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); + for (DataSplit dataSplit : dataSplits) { + partitionFiles + .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) + .addAll(dataSplit.dataFiles()); + } return partitionFiles.entrySet().stream() .collect( Collectors.toMap( @@ -208,16 +182,6 @@ public class IncrementalClusterManager { return partitionLevels; } - private Map<BinaryRow, List<DataFileMeta>> getPartitionFiles(List<DataSplit> dataSplits) { - Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); - for (DataSplit dataSplit : dataSplits) { - partitionFiles - .computeIfAbsent(dataSplit.partition(), k -> new ArrayList<>()) - .addAll(dataSplit.dataFiles()); - } - return partitionFiles; - } - public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> files) { List<DataSplit> splits = new ArrayList<>(); @@ -283,7 +247,8 @@ public class IncrementalClusterManager { return clusterKeys; } - public HistoryPartitionCluster historyPartitionCluster() { + @VisibleForTesting + HistoryPartitionCluster historyPartitionCluster() { return historyPartitionCluster; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 6f06514ed3..7ed77406c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -197,8 +197,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { } @Override - public FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter) { - manifestsReader.withManifestLevelFilter(manifestLevelFilter); + public FileStoreScan withLevelMinMaxFilter(BiFilter<Integer, Integer> minMaxFilter) { + manifestsReader.withLevelMinMaxFilter(minMaxFilter); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 354c43e019..047b1c3f5d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -75,7 +75,7 @@ public interface FileStoreScan { FileStoreScan withLevelFilter(Filter<Integer> levelFilter); - FileStoreScan withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter); + FileStoreScan withLevelMinMaxFilter(BiFilter<Integer, Integer> minMaxFilter); FileStoreScan enableValueFilter(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index dbf5140583..b3b89e72aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -53,7 +53,7 @@ public class ManifestsReader { @Nullable private Integer specifiedBucket = null; @Nullable private Integer specifiedLevel = null; @Nullable private PartitionPredicate partitionFilter = null; - @Nullable private BiFilter<Integer, Integer> manifestLevelFilter = null; + @Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null; public ManifestsReader( RowType partitionType, @@ -81,8 +81,8 @@ public class ManifestsReader { return this; } - public ManifestsReader withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter) { - this.manifestLevelFilter = manifestLevelFilter; + public ManifestsReader withLevelMinMaxFilter(BiFilter<Integer, Integer> minMaxFilter) { + this.levelMinMaxFilter = minMaxFilter; return this; } @@ -167,7 +167,7 @@ public class ManifestsReader { && (specifiedLevel < minLevel || specifiedLevel > maxLevel)) { return false; } - if (manifestLevelFilter != null && !manifestLevelFilter.test(minLevel, maxLevel)) { + if (levelMinMaxFilter != null && !levelMinMaxFilter.test(minLevel, maxLevel)) { return false; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 91505ffcf9..ed80f3f92a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -88,7 +88,7 @@ public interface SnapshotReader { SnapshotReader withLevelFilter(Filter<Integer> levelFilter); - SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter); + SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer> minMaxFilter); SnapshotReader enableValueFilter(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 7b1ca7b37e..477ce34e93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -255,8 +255,9 @@ public class SnapshotReaderImpl implements SnapshotReader { return this; } - public SnapshotReader withManifestLevelFilter(BiFilter<Integer, Integer> manifestLevelFilter) { - scan.withManifestLevelFilter(manifestLevelFilter); + @Override + public SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer> minMaxFilter) { + scan.withLevelMinMaxFilter(minMaxFilter); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index f71eba944d..3fd6a8d8af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -361,9 +361,8 @@ public class AuditLogTable implements DataTable, ReadonlyTable { } @Override - public SnapshotReader withManifestLevelFilter( - BiFilter<Integer, Integer> manifestLevelFilter) { - wrapped.withManifestLevelFilter(manifestLevelFilter); + public SnapshotReader withLevelMinMaxFilter(BiFilter<Integer, Integer> minMaxFilter) { + wrapped.withLevelMinMaxFilter(minMaxFilter); return this; } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java index 4692b50fbb..f1f93ca989 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/HistoryPartitionClusterTest.java @@ -24,8 +24,6 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.mergetree.LevelSortedRun; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.schema.Schema; @@ -39,17 +37,12 @@ import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import static org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce; -import static org.apache.paimon.append.cluster.IncrementalClusterStrategyTest.createFile; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link HistoryPartitionCluster}. */ @@ -57,53 +50,6 @@ public class HistoryPartitionClusterTest { @TempDir java.nio.file.Path tempDir; - @Test - public void testFindLowLevelPartitions() throws Exception { - FileStoreTable table = createTable(Collections.emptyMap(), Collections.emptyList()); - long now = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>(); - - // specified partition, has low-level files - BinaryRow partition1 = BinaryRow.singleColumn(1); - PartitionEntry partitionEntry1 = new PartitionEntry(partition1, 0, 0, 0, now); - partitionFiles.put( - partition1, Lists.newArrayList(createFile(100, 1, 3), createFile(100, 1, 5))); - // has no low-level files - BinaryRow partition2 = BinaryRow.singleColumn(2); - PartitionEntry partitionEntry2 = new PartitionEntry(partition2, 0, 0, 0, now); - partitionFiles.put(partition2, Lists.newArrayList(createFile(100, 1, 0))); - // has low-level files - BinaryRow partition3 = BinaryRow.singleColumn(3); - PartitionEntry partitionEntry3 = new PartitionEntry(partition3, 0, 0, 0, now); - partitionFiles.put( - partition3, Lists.newArrayList(createFile(100, 1, 0), createFile(100, 1, 2))); - // has no low-level files - BinaryRow partition4 = BinaryRow.singleColumn(4); - PartitionEntry partitionEntry4 = new PartitionEntry(partition3, 0, 0, 0, now); - partitionFiles.put(partition4, Lists.newArrayList(createFile(100, 1, 0))); - - IncrementalClusterManager incrementalClusterManager = - new IncrementalClusterManager( - table, - PartitionPredicate.fromMultiple( - RowType.of(DataTypes.INT()), Lists.newArrayList(partition1))); - HistoryPartitionCluster historyPartitionCluster = - incrementalClusterManager.historyPartitionCluster(); - Set<BinaryRow> selectedPartitions = - historyPartitionCluster.findLowLevelPartitions( - Lists.newArrayList( - partitionEntry1, - partitionEntry2, - partitionEntry3, - partitionEntry4) - .stream() - .map(PartitionEntry::partition) - .collect(Collectors.toList()), - partitionFiles); - - assertThat(selectedPartitions).contains(partition2); - } - @Test public void testHistoryPartitionAutoClustering() throws Exception { FileStoreTable table = createTable(Collections.emptyMap(), Collections.singletonList("f2")); @@ -153,8 +99,7 @@ public class HistoryPartitionClusterTest { // test not specify partition and disable history partition auto clustering historyPartitionCluster = new IncrementalClusterManager(table).historyPartitionCluster(); - partitionLevels = historyPartitionCluster.constructLevelsForHistoryPartitions(); - assertThat(partitionLevels.isEmpty()).isTrue(); + assertThat(historyPartitionCluster).isNull(); } protected FileStoreTable createTable( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 37cc006673..4b62caab88 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -58,7 +58,7 @@ public class IncrementalClusterManagerTest { @TempDir java.nio.file.Path tempDir; @Test - public void testNonUnAwareBucketTable() throws Exception { + public void testNonUnAwareBucketTable() { Map<String, String> options = new HashMap<>(); options.put(CoreOptions.BUCKET.key(), "1"); options.put(CoreOptions.BUCKET_KEY.key(), "f0");
