This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 683ac54636 [core] Simplify force rewrite files in Compact Task (#5751)
683ac54636 is described below
commit 683ac546362ed00e98adda50d16e25e4086d4d04
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 10:16:53 2025 +0800
[core] Simplify force rewrite files in Compact Task (#5751)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 8 +--
.../append/BucketedAppendCompactManager.java | 20 +++----
.../org/apache/paimon/compact/CompactUnit.java | 43 ++++++++------
.../paimon/mergetree/compact/CompactStrategy.java | 35 ++++-------
.../mergetree/compact/FileRewriteCompactTask.java | 67 ++++++++++++++++++++++
.../mergetree/compact/MergeTreeCompactManager.java | 45 +++++++++------
.../mergetree/compact/MergeTreeCompactTask.java | 59 ++++++-------------
.../operation/BucketedAppendFileStoreWrite.java | 2 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 2 +-
.../flink/procedure/CompactProcedureITCase.java | 4 +-
11 files changed, 167 insertions(+), 120 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6bee1104a1..d7b21f18f7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -183,7 +183,7 @@ under the License.
<td>Ratio of the deleted rows in a data file to be forced
compacted for append-only table.</td>
</tr>
<tr>
- <td><h5>compaction.force-compact-all-files</h5></td>
+ <td><h5>compaction.force-rewrite-all-files</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to force pick all files for a full compaction. Usually
seen in a compaction task to external paths.</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 47967be549..4c446dc4e5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -151,8 +151,8 @@ public class CoreOptions implements Serializable {
+ ExternalPathStrategy.SPECIFIC_FS
+ ", should be the prefix scheme of the
external path, now supported are s3 and oss.");
- public static final ConfigOption<Boolean>
COMPACTION_FORCE_COMPACT_ALL_FILES =
- key("compaction.force-compact-all-files")
+ public static final ConfigOption<Boolean>
COMPACTION_FORCE_REWRITE_ALL_FILES =
+ key("compaction.force-rewrite-all-files")
.booleanType()
.defaultValue(false)
.withDescription(
@@ -2473,8 +2473,8 @@ public class CoreOptions implements Serializable {
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
}
- public Boolean forceCompactAllFiles() {
- return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES);
+ public Boolean forceRewriteAllFiles() {
+ return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
}
public String partitionTimestampFormatter() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 520fd8edb3..a27a40e8a9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -60,7 +60,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
private final PriorityQueue<DataFileMeta> toCompact;
private final int minFileNum;
private final long targetFileSize;
- private final boolean forceCompactAllFiles;
+ private final boolean forceRewriteAllFiles;
private final CompactRewriter rewriter;
private List<DataFileMeta> compacting;
@@ -73,7 +73,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
@Nullable DeletionVectorsMaintainer dvMaintainer,
int minFileNum,
long targetFileSize,
- boolean forceCompactAllFiles,
+ boolean forceRewriteAllFiles,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
@@ -82,7 +82,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
this.targetFileSize = targetFileSize;
- this.forceCompactAllFiles = forceCompactAllFiles;
+ this.forceRewriteAllFiles = forceRewriteAllFiles;
this.rewriter = rewriter;
this.metricsReporter = metricsReporter;
}
@@ -102,7 +102,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
"A compaction task is still running while the user "
+ "forces a new compaction. This is unexpected.");
// if all files are force picked or deletion vector enables, always
trigger compaction.
- if (!forceCompactAllFiles
+ if (!forceRewriteAllFiles
&& (toCompact.isEmpty()
|| (dvMaintainer == null && toCompact.size() <
FULL_COMPACT_MIN_FILE))) {
return;
@@ -118,7 +118,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
dvMaintainer,
toCompact,
targetFileSize,
- forceCompactAllFiles,
+ forceRewriteAllFiles,
rewriter,
metricsReporter));
recordCompactionsQueuedRequest();
@@ -243,28 +243,28 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
private final DeletionVectorsMaintainer dvMaintainer;
private final LinkedList<DataFileMeta> toCompact;
private final long targetFileSize;
- private final boolean forceCompactAllFiles;
+ private final boolean forceRewriteAllFiles;
private final CompactRewriter rewriter;
public FullCompactTask(
DeletionVectorsMaintainer dvMaintainer,
Collection<DataFileMeta> inputs,
long targetFileSize,
- boolean forceCompactAllFiles,
+ boolean forceRewriteAllFiles,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.dvMaintainer = dvMaintainer;
this.toCompact = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
- this.forceCompactAllFiles = forceCompactAllFiles;
+ this.forceRewriteAllFiles = forceRewriteAllFiles;
this.rewriter = rewriter;
}
@Override
protected CompactResult doCompact() throws Exception {
// remove large files
- while (!forceCompactAllFiles && !toCompact.isEmpty()) {
+ while (!forceRewriteAllFiles && !toCompact.isEmpty()) {
DataFileMeta file = toCompact.peekFirst();
// the data file with deletion file always need to be
compacted.
if (file.fileSize() >= targetFileSize &&
!hasDeletionFile(file)) {
@@ -289,7 +289,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
small++;
}
}
- if (forceCompactAllFiles
+ if (forceRewriteAllFiles
|| (small > big && toCompact.size() >=
FULL_COMPACT_MIN_FILE)) {
return compact(null, toCompact, rewriter);
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
index 9b0fdd26ad..ae367f9171 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java
@@ -25,31 +25,40 @@ import java.util.ArrayList;
import java.util.List;
/** A files unit for compaction. */
-public interface CompactUnit {
+public class CompactUnit {
- int outputLevel();
+ private final int outputLevel;
+ private final List<DataFileMeta> files;
+ private final boolean fileRewrite;
- List<DataFileMeta> files();
+ public CompactUnit(int outputLevel, List<DataFileMeta> files, boolean
fileRewrite) {
+ this.outputLevel = outputLevel;
+ this.files = files;
+ this.fileRewrite = fileRewrite;
+ }
+
+ public int outputLevel() {
+ return outputLevel;
+ }
+
+ public List<DataFileMeta> files() {
+ return files;
+ }
+
+ public boolean fileRewrite() {
+ return fileRewrite;
+ }
- static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun>
runs) {
+ public static CompactUnit fromLevelRuns(int outputLevel,
List<LevelSortedRun> runs) {
List<DataFileMeta> files = new ArrayList<>();
for (LevelSortedRun run : runs) {
files.addAll(run.run().files());
}
- return fromFiles(outputLevel, files);
+ return fromFiles(outputLevel, files, false);
}
- static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {
- return new CompactUnit() {
- @Override
- public int outputLevel() {
- return outputLevel;
- }
-
- @Override
- public List<DataFileMeta> files() {
- return files;
- }
- };
+ public static CompactUnit fromFiles(
+ int outputLevel, List<DataFileMeta> files, boolean fileRewrite) {
+ return new CompactUnit(outputLevel, files, fileRewrite);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index ef56196e0a..0ab0981963 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
/** Compact strategy to decide which files to select for compaction. */
public interface CompactStrategy {
@@ -56,16 +55,19 @@ public interface CompactStrategy {
List<LevelSortedRun> runs,
@Nullable RecordLevelExpire recordLevelExpire,
@Nullable DeletionVectorsMaintainer dvMaintainer,
- boolean forceCompactAllFiles) {
+ boolean forceRewriteAllFiles) {
int maxLevel = numLevels - 1;
if (runs.isEmpty()) {
// no sorted run, no need to compact
return Optional.empty();
- } else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
+ }
+
+ // only max level files
+ if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
for (DataFileMeta file : runs.get(0).run().files()) {
- if (forceCompactAllFiles) {
+ if (forceRewriteAllFiles) {
// add all files when force compacted
filesToBeCompacted.add(file);
} else if (recordLevelExpire != null &&
recordLevelExpire.isExpireFile(file)) {
@@ -78,27 +80,14 @@ public interface CompactStrategy {
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Pick these files which have expired records or dv
index for full compaction: {}",
- filesToBeCompacted.stream()
- .map(
- file ->
- String.format(
- "(%s, %d, %d)",
- file.fileName(),
- file.level(),
- file.fileSize()))
- .collect(Collectors.joining(", ")));
- }
-
- if (!filesToBeCompacted.isEmpty()) {
- return Optional.of(CompactUnit.fromFiles(maxLevel,
filesToBeCompacted));
- } else {
+ if (filesToBeCompacted.isEmpty()) {
return Optional.empty();
}
- } else {
- return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+
+ return Optional.of(CompactUnit.fromFiles(maxLevel,
filesToBeCompacted, true));
}
+
+ // full compaction
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java
new file mode 100644
index 0000000000..620ea0748a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FileRewriteCompactTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+
+/** Compact task for file rewrite compaction. */
+public class FileRewriteCompactTask extends CompactTask {
+
+ private final CompactRewriter rewriter;
+ private final int outputLevel;
+ private final List<DataFileMeta> files;
+ private final boolean dropDelete;
+
+ public FileRewriteCompactTask(
+ CompactRewriter rewriter,
+ CompactUnit unit,
+ boolean dropDelete,
+ @Nullable CompactionMetrics.Reporter metricsReporter) {
+ super(metricsReporter);
+ this.rewriter = rewriter;
+ this.outputLevel = unit.outputLevel();
+ this.files = unit.files();
+ this.dropDelete = dropDelete;
+ }
+
+ @Override
+ protected CompactResult doCompact() throws Exception {
+ CompactResult result = new CompactResult();
+ for (DataFileMeta file : files) {
+ rewriteFile(file, result);
+ }
+ return result;
+ }
+
+ private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws
Exception {
+ List<List<SortedRun>> candidate =
singletonList(singletonList(SortedRun.fromSingle(file)));
+ toUpdate.merge(rewriter.rewrite(outputLevel, dropDelete, candidate));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 953a235391..025fd20156 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
@@ -65,7 +66,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
private final boolean needLookup;
- private final boolean forceCompactAllFiles;
+ private final boolean forceRewriteAllFiles;
@Nullable private final RecordLevelExpire recordLevelExpire;
@@ -82,7 +83,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
boolean lazyGenDeletionFile,
boolean needLookup,
@Nullable RecordLevelExpire recordLevelExpire,
- boolean forceCompactAllFiles) {
+ boolean forceRewriteAllFiles) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
@@ -95,7 +96,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.lazyGenDeletionFile = lazyGenDeletionFile;
this.recordLevelExpire = recordLevelExpire;
this.needLookup = needLookup;
- this.forceCompactAllFiles = forceCompactAllFiles;
+ this.forceRewriteAllFiles = forceRewriteAllFiles;
MetricUtils.safeCall(this::reportMetrics, LOG);
}
@@ -142,7 +143,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
runs,
recordLevelExpire,
dvMaintainer,
- forceCompactAllFiles);
+ forceRewriteAllFiles);
} else {
if (taskFuture != null) {
return;
@@ -152,7 +153,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
}
optionalUnit =
strategy.pick(levels.numberOfLevels(), runs)
- .filter(unit -> unit.files().size() > 0)
+ .filter(unit -> !unit.files().isEmpty())
.filter(
unit ->
unit.files().size() > 1
@@ -206,22 +207,28 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
: () ->
CompactDeletionFile.generateFiles(dvMaintainer);
}
- MergeTreeCompactTask task =
- new MergeTreeCompactTask(
- keyComparator,
- compactionFileSize,
- rewriter,
- unit,
- dropDelete,
- levels.maxLevel(),
- metricsReporter,
- compactDfSupplier,
- dvMaintainer,
- recordLevelExpire,
- forceCompactAllFiles);
+ CompactTask task;
+ if (unit.fileRewrite()) {
+ task = new FileRewriteCompactTask(rewriter, unit, dropDelete,
metricsReporter);
+ } else {
+ task =
+ new MergeTreeCompactTask(
+ keyComparator,
+ compactionFileSize,
+ rewriter,
+ unit,
+ dropDelete,
+ levels.maxLevel(),
+ metricsReporter,
+ compactDfSupplier,
+ recordLevelExpire,
+ forceRewriteAllFiles);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Pick these files (name, level, size) for compaction: {}",
+ "Pick these files (name, level, size) for {} compaction:
{}",
+ task.getClass().getSimpleName(),
unit.files().stream()
.map(
file ->
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 4aa10e9b79..667a965c62 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -23,7 +23,6 @@ import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.mergetree.SortedRun;
@@ -45,19 +44,15 @@ public class MergeTreeCompactTask extends CompactTask {
private final CompactRewriter rewriter;
private final int outputLevel;
private final Supplier<CompactDeletionFile> compactDfSupplier;
-
private final List<List<SortedRun>> partitioned;
-
private final boolean dropDelete;
private final int maxLevel;
+ @Nullable private final RecordLevelExpire recordLevelExpire;
+ private final boolean forceRewriteAllFiles;
// metric
private int upgradeFilesNum;
- @Nullable private final RecordLevelExpire recordLevelExpire;
- private final boolean forceCompactAllFiles;
- @Nullable private final DeletionVectorsMaintainer dvMaintainer;
-
public MergeTreeCompactTask(
Comparator<InternalRow> keyComparator,
long minFileSize,
@@ -67,20 +62,18 @@ public class MergeTreeCompactTask extends CompactTask {
int maxLevel,
@Nullable CompactionMetrics.Reporter metricsReporter,
Supplier<CompactDeletionFile> compactDfSupplier,
- @Nullable DeletionVectorsMaintainer dvMaintainer,
@Nullable RecordLevelExpire recordLevelExpire,
- boolean forceCompactAllFiles) {
+ boolean forceRewriteAllFiles) {
super(metricsReporter);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
this.compactDfSupplier = compactDfSupplier;
- this.dvMaintainer = dvMaintainer;
this.partitioned = new IntervalPartition(unit.files(),
keyComparator).partition();
this.dropDelete = dropDelete;
this.maxLevel = maxLevel;
this.recordLevelExpire = recordLevelExpire;
- this.forceCompactAllFiles = forceCompactAllFiles;
+ this.forceRewriteAllFiles = forceRewriteAllFiles;
this.upgradeFilesNum = 0;
}
@@ -128,34 +121,19 @@ public class MergeTreeCompactTask extends CompactTask {
}
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws
Exception {
- if (file.level() == outputLevel) {
- if (forceCompactAllFiles
- || isContainExpiredRecords(file)
- || (dvMaintainer != null
- &&
dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) {
- /*
- * 1. if files are force picked, we need to rewrite all files.
- * 2. if the large file in maxLevel has expired records, we
need to rewrite it.
- * 3. if the large file in maxLevel has corresponding deletion
vector, we need to rewrite it.
- */
- rewriteFile(file, toUpdate);
- }
+ if ((outputLevel == maxLevel && containsDeleteRecords(file))
+ || forceRewriteAllFiles
+ || containsExpiredRecords(file)) {
+ List<List<SortedRun>> candidate = new ArrayList<>();
+ candidate.add(singletonList(SortedRun.fromSingle(file)));
+ rewriteImpl(candidate, toUpdate);
return;
}
- if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d ==
0).orElse(false)) {
- if (forceCompactAllFiles || isContainExpiredRecords(file)) {
- // if all files are force picked, or the file which could be
directly upgraded has
- // expired records, we need to rewrite it
- rewriteFile(file, toUpdate);
- } else {
- CompactResult upgradeResult = rewriter.upgrade(outputLevel,
file);
- toUpdate.merge(upgradeResult);
- upgradeFilesNum++;
- }
- } else {
- // files with delete records should not be upgraded directly to
max level
- rewriteFile(file, toUpdate);
+ if (file.level() != outputLevel) {
+ CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
+ toUpdate.merge(upgradeResult);
+ upgradeFilesNum++;
}
}
@@ -185,14 +163,11 @@ public class MergeTreeCompactTask extends CompactTask {
candidate.clear();
}
- private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws
Exception {
- List<List<SortedRun>> candidate = new ArrayList<>();
- candidate.add(new ArrayList<>());
- candidate.get(0).add(SortedRun.fromSingle(file));
- rewriteImpl(candidate, toUpdate);
+ private boolean containsDeleteRecords(DataFileMeta file) {
+ return file.deleteRowCount().map(d -> d > 0).orElse(true);
}
- private boolean isContainExpiredRecords(DataFileMeta file) {
+ private boolean containsExpiredRecords(DataFileMeta file) {
return recordLevelExpire != null &&
recordLevelExpire.isExpireFile(file);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index d8f2d9211b..03320ad6e1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -91,7 +91,7 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
dvMaintainer,
options.compactionMinFileNum(),
options.targetFileSize(false),
- options.forceCompactAllFiles(),
+ options.forceRewriteAllFiles(),
files -> compactRewrite(partition, bucket, dvFactory,
files),
compactionMetrics == null
? null
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 7c0b36701f..30fce29821 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -291,7 +291,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
- options.forceCompactAllFiles());
+ options.forceRewriteAllFiles());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index d58c09d3a8..02001ff1af 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -222,7 +222,7 @@ public class CompactProcedureITCase extends
CatalogITCaseBase {
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
sql(
"CALL sys.compact(`table` => 'default.Tpk',"
- + " options =>
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+ + " options =>
'compaction.force-rewrite-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
tmpPath);
List<DataSplit> splits =
pkTable.newSnapshotReader().read().dataSplits();
for (DataSplit split : splits) {
@@ -253,7 +253,7 @@ public class CompactProcedureITCase extends
CatalogITCaseBase {
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
sql(
"CALL sys.compact(`table` => 'default.Tap',"
- + " options =>
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+ + " options =>
'compaction.force-rewrite-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
tmpPath);
splits = apTable.newSnapshotReader().read().dataSplits();
for (DataSplit split : splits) {