This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new db2ccb98ed [core] Reduce the file size per append table compaction
task (#5493)
db2ccb98ed is described below
commit db2ccb98ed41002a1b3b85b5f47f5bbd01ce7a02
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 21 11:40:41 2025 +0800
[core] Reduce the file size per append table compaction task (#5493)
---
.../paimon/append/UnawareAppendTableCompactionCoordinator.java | 2 +-
.../append/UnawareAppendTableCompactionCoordinatorTest.java | 10 +++++-----
.../paimon/spark/procedure/CompactProcedureTestBase.scala | 4 ++--
3 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 565bd5d99d..937a589df2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -330,7 +330,7 @@ public class UnawareAppendTableCompactionCoordinator {
}
public boolean binFull() {
- return totalFileSize >= targetFileSize * 50 && fileNum >=
minFileNum;
+ return totalFileSize >= targetFileSize * 2 && fileNum >=
minFileNum;
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
index 13f21cdfe0..382926fac6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java
@@ -59,7 +59,7 @@ public class UnawareAppendTableCompactionCoordinatorTest {
@Test
public void testForCompactPlan() {
List<DataFileMeta> files = generateNewFiles(200, 0);
- assertTasks(files, 1);
+ assertTasks(files, 2);
}
@Test
@@ -73,7 +73,7 @@ public class UnawareAppendTableCompactionCoordinatorTest {
List<DataFileMeta> files =
generateNewFiles(
100,
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 3 + 1);
- assertTasks(files, 1);
+ assertTasks(files, 17);
}
@Test
@@ -91,14 +91,14 @@ public class UnawareAppendTableCompactionCoordinatorTest {
1000,
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 10);
compactionCoordinator.notifyNewFiles(partition, files);
- assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(3);
+ assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(56);
files.clear();
files =
generateNewFiles(
1050,
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 5);
compactionCoordinator.notifyNewFiles(partition, files);
- assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(5);
+ assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(105);
}
@Test
@@ -107,7 +107,7 @@ public class UnawareAppendTableCompactionCoordinatorTest {
generateNewFiles(
1089,
appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 5);
compactionCoordinator.notifyNewFiles(partition, files);
- assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(5);
+ assertThat(compactionCoordinator.compactPlan().size()).isEqualTo(109);
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 9bea013589..ac09735fca 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -767,12 +767,12 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
spark.sql(
"CALL sys.compact(table => 'T', options =>
'source.split.open-file-cost=3200M, compaction.min.file-num=2')")
- // sparkParallelism is 5, task groups is 1, use 1 as the read parallelism
+ // sparkParallelism is 5, task groups is 3, use 3 as the read parallelism
spark.conf.set("spark.sql.shuffle.partitions", 5)
spark.sql(
"CALL sys.compact(table => 'T', options =>
'source.split.open-file-cost=3200M, compaction.min.file-num=2')")
- assertResult(Seq(2, 1))(taskBuffer)
+ assertResult(Seq(2, 3))(taskBuffer)
} finally {
spark.sparkContext.removeSparkListener(listener)
}