This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0be50e2e22 [core][test] add test case for picking some small files in
top level when full compaction (#5753)
0be50e2e22 is described below
commit 0be50e2e22f4c89d72273a9fe650434894dcb877
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Jun 16 17:52:34 2025 +0800
[core][test] add test case for picking some small files in top level when
full compaction (#5753)
---
.../apache/paimon/table/RecordLevelExpireTest.java | 58 +++++++++++++++++++++-
1 file changed, 57 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index b24eb527e4..4007df18a1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -41,6 +41,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -262,7 +263,7 @@ class RecordLevelExpireTest extends PrimaryKeyTableTestBase
{
table = table.copy(map);
int currentSecs = (int) (System.currentTimeMillis() / 1000);
- // if seconds is too short, this test might file
+ // if seconds is too short, this test might fail
int seconds = 5;
// large file A. It has no delete records and expired records, will be
upgraded to maxLevel
@@ -307,6 +308,61 @@ class RecordLevelExpireTest extends
PrimaryKeyTableTestBase {
GenericRow.of(1, 3, currentSecs + 60 * 60));
}
+ @Test
+ public void testPickSmallFilesWhenFullCompact() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put(CoreOptions.TARGET_FILE_SIZE.key(), "6000 B");
+ table = table.copy(map);
+
+ int currentSecs = (int) (System.currentTimeMillis() / 1000);
+ // if seconds is too short, this test might fail
+ int seconds = 5;
+
+ // [1-1000], no expire
+ writeCommit(rows(1, 1000, currentSecs + 60 * 60).toArray(new
GenericRow[0]));
+ compact(1);
+ List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(splits.get(0).dataFiles().size()).isEqualTo(1);
+
+ // [1001-1012], 1011 will be expired
+ List<GenericRow> rows2 = rows(1001, 1010, currentSecs + 60 * 60);
+ rows2.add(GenericRow.of(1, 1011, currentSecs + seconds));
+ rows2.add(GenericRow.of(1, 1012, currentSecs + 60 * 60));
+ writeCommit(rows2.toArray(new GenericRow[0]));
+ compact(1);
+ splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.get(0).dataFiles().size()).isEqualTo(2);
+
+ // this will generate 2 files: [2000-2999],[3000,3012]. 3011 will be
expired
+ List<GenericRow> rows3 = rows(2000, 3010, currentSecs + 60 * 60);
+ rows3.add(GenericRow.of(1, 3011, currentSecs + seconds));
+ rows3.add(GenericRow.of(1, 3012, currentSecs + 60 * 60));
+ writeCommit(rows3.toArray(new GenericRow[0]));
+ compact(1);
+ splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.get(0).dataFiles().size()).isEqualTo(4);
+
assertThat(splits.get(0).dataFiles().stream().mapToLong(DataFileMeta::rowCount).sum())
+ .isEqualTo(2025);
+
+ // ensure (1, 2, currentSecs + seconds) out of date
+ Thread.sleep(seconds * 1000 + 2000);
+ // pick two small files: [1001-1012] and [3000,3012]
+ compact(1);
+ splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.get(0).dataFiles().size()).isEqualTo(4);
+
assertThat(splits.get(0).dataFiles().stream().mapToLong(DataFileMeta::rowCount).sum())
+ .isEqualTo(2023);
+ }
+
+ private List<GenericRow> rows(int start, int end, int time) {
+ List<GenericRow> rows = new ArrayList<>();
+ for (int i = start; i <= end; i++) {
+ rows.add(GenericRow.of(1, i, time));
+ }
+ return rows;
+ }
+
private void refreshTable() throws Catalog.TableNotExistException {
CatalogContext context =
CatalogContext.create(