This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9c9452ee7f [core] Support dedicated full compact to external paths
(#5674)
9c9452ee7f is described below
commit 9c9452ee7f4f4b50ee1c99c3fbfc0dd3368f37a8
Author: Jiao Mingye <[email protected]>
AuthorDate: Sun Jun 15 09:38:27 2025 +0800
[core] Support dedicated full compact to external paths (#5674)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 11 ++++
.../append/BucketedAppendCompactManager.java | 19 ++++--
.../paimon/mergetree/compact/CompactStrategy.java | 8 ++-
.../mergetree/compact/MergeTreeCompactManager.java | 14 ++++-
.../mergetree/compact/MergeTreeCompactTask.java | 19 +++---
.../operation/BucketedAppendFileStoreWrite.java | 1 +
.../paimon/operation/KeyValueFileStoreWrite.java | 3 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 1 +
.../append/BucketedAppendCompactManagerTest.java | 1 +
.../apache/paimon/append/FullCompactTaskTest.java | 2 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 2 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 6 +-
.../compact/MergeTreeCompactManagerTest.java | 9 ++-
.../flink/procedure/CompactProcedureITCase.java | 67 ++++++++++++++++++++++
15 files changed, 144 insertions(+), 25 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 8cd61f61d6..6bee1104a1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,6 +182,12 @@ under the License.
<td>Double</td>
<td>Ratio of the deleted rows in a data file to be forced
compacted for append-only table.</td>
</tr>
+ <tr>
+ <td><h5>compaction.force-compact-all-files</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to force pick all files for a full compaction. Usually
seen in a compaction task to external paths.</td>
+ </tr>
<tr>
<td><h5>compaction.force-up-level-0</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 1df7949acc..47967be549 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -151,6 +151,13 @@ public class CoreOptions implements Serializable {
+ ExternalPathStrategy.SPECIFIC_FS
+ ", should be the prefix scheme of the
external path, now supported are s3 and oss.");
+ public static final ConfigOption<Boolean>
COMPACTION_FORCE_COMPACT_ALL_FILES =
+ key("compaction.force-compact-all-files")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to force pick all files for a full
compaction. Usually seen in a compaction task to external paths.");
+
@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<String> PATH =
key("path")
@@ -2466,6 +2473,10 @@ public class CoreOptions implements Serializable {
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
}
+ public Boolean forceCompactAllFiles() {
+ return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES);
+ }
+
public String partitionTimestampFormatter() {
return options.get(PARTITION_TIMESTAMP_FORMATTER);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index d5c47c3b88..520fd8edb3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -60,6 +60,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
private final PriorityQueue<DataFileMeta> toCompact;
private final int minFileNum;
private final long targetFileSize;
+ private final boolean forceCompactAllFiles;
private final CompactRewriter rewriter;
private List<DataFileMeta> compacting;
@@ -72,6 +73,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
@Nullable DeletionVectorsMaintainer dvMaintainer,
int minFileNum,
long targetFileSize,
+ boolean forceCompactAllFiles,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
@@ -80,6 +82,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
this.targetFileSize = targetFileSize;
+ this.forceCompactAllFiles = forceCompactAllFiles;
this.rewriter = rewriter;
this.metricsReporter = metricsReporter;
}
@@ -98,9 +101,10 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
taskFuture == null,
"A compaction task is still running while the user "
+ "forces a new compaction. This is unexpected.");
- // if deletion vector enables, always trigger compaction.
- if (toCompact.isEmpty()
- || (dvMaintainer == null && toCompact.size() <
FULL_COMPACT_MIN_FILE)) {
+ // if all files are force picked or deletion vector enables, always
trigger compaction.
+ if (!forceCompactAllFiles
+ && (toCompact.isEmpty()
+ || (dvMaintainer == null && toCompact.size() <
FULL_COMPACT_MIN_FILE))) {
return;
}
@@ -114,6 +118,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
dvMaintainer,
toCompact,
targetFileSize,
+ forceCompactAllFiles,
rewriter,
metricsReporter));
recordCompactionsQueuedRequest();
@@ -238,25 +243,28 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
private final DeletionVectorsMaintainer dvMaintainer;
private final LinkedList<DataFileMeta> toCompact;
private final long targetFileSize;
+ private final boolean forceCompactAllFiles;
private final CompactRewriter rewriter;
public FullCompactTask(
DeletionVectorsMaintainer dvMaintainer,
Collection<DataFileMeta> inputs,
long targetFileSize,
+ boolean forceCompactAllFiles,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.dvMaintainer = dvMaintainer;
this.toCompact = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
+ this.forceCompactAllFiles = forceCompactAllFiles;
this.rewriter = rewriter;
}
@Override
protected CompactResult doCompact() throws Exception {
// remove large files
- while (!toCompact.isEmpty()) {
+ while (!forceCompactAllFiles && !toCompact.isEmpty()) {
DataFileMeta file = toCompact.peekFirst();
// the data file with deletion file always need to be
compacted.
if (file.fileSize() >= targetFileSize &&
!hasDeletionFile(file)) {
@@ -281,7 +289,8 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
small++;
}
}
- if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) {
+ if (forceCompactAllFiles
+ || (small > big && toCompact.size() >=
FULL_COMPACT_MIN_FILE)) {
return compact(null, toCompact, rewriter);
} else {
return result(emptyList(), emptyList());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index ec82e9e530..ef56196e0a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -55,7 +55,8 @@ public interface CompactStrategy {
int numLevels,
List<LevelSortedRun> runs,
@Nullable RecordLevelExpire recordLevelExpire,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable DeletionVectorsMaintainer dvMaintainer,
+ boolean forceCompactAllFiles) {
int maxLevel = numLevels - 1;
if (runs.isEmpty()) {
// no sorted run, no need to compact
@@ -64,7 +65,10 @@ public interface CompactStrategy {
List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
for (DataFileMeta file : runs.get(0).run().files()) {
- if (recordLevelExpire != null &&
recordLevelExpire.isExpireFile(file)) {
+ if (forceCompactAllFiles) {
+ // add all files when force compacted
+ filesToBeCompacted.add(file);
+ } else if (recordLevelExpire != null &&
recordLevelExpire.isExpireFile(file)) {
// check record level expire for large files
filesToBeCompacted.add(file);
} else if (dvMaintainer != null
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index a4852e7346..953a235391 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
private final boolean needLookup;
+ private final boolean forceCompactAllFiles;
@Nullable private final RecordLevelExpire recordLevelExpire;
@@ -80,7 +81,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Nullable DeletionVectorsMaintainer dvMaintainer,
boolean lazyGenDeletionFile,
boolean needLookup,
- @Nullable RecordLevelExpire recordLevelExpire) {
+ @Nullable RecordLevelExpire recordLevelExpire,
+ boolean forceCompactAllFiles) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
@@ -93,6 +95,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.lazyGenDeletionFile = lazyGenDeletionFile;
this.recordLevelExpire = recordLevelExpire;
this.needLookup = needLookup;
+ this.forceCompactAllFiles = forceCompactAllFiles;
MetricUtils.safeCall(this::reportMetrics, LOG);
}
@@ -135,7 +138,11 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
}
optionalUnit =
CompactStrategy.pickFullCompaction(
- levels.numberOfLevels(), runs, recordLevelExpire,
dvMaintainer);
+ levels.numberOfLevels(),
+ runs,
+ recordLevelExpire,
+ dvMaintainer,
+ forceCompactAllFiles);
} else {
if (taskFuture != null) {
return;
@@ -210,7 +217,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
metricsReporter,
compactDfSupplier,
dvMaintainer,
- recordLevelExpire);
+ recordLevelExpire,
+ forceCompactAllFiles);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 0ce9fbb4c2..4aa10e9b79 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -55,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask {
private int upgradeFilesNum;
@Nullable private final RecordLevelExpire recordLevelExpire;
+ private final boolean forceCompactAllFiles;
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
public MergeTreeCompactTask(
@@ -67,7 +68,8 @@ public class MergeTreeCompactTask extends CompactTask {
@Nullable CompactionMetrics.Reporter metricsReporter,
Supplier<CompactDeletionFile> compactDfSupplier,
@Nullable DeletionVectorsMaintainer dvMaintainer,
- @Nullable RecordLevelExpire recordLevelExpire) {
+ @Nullable RecordLevelExpire recordLevelExpire,
+ boolean forceCompactAllFiles) {
super(metricsReporter);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
@@ -78,6 +80,7 @@ public class MergeTreeCompactTask extends CompactTask {
this.dropDelete = dropDelete;
this.maxLevel = maxLevel;
this.recordLevelExpire = recordLevelExpire;
+ this.forceCompactAllFiles = forceCompactAllFiles;
this.upgradeFilesNum = 0;
}
@@ -126,12 +129,14 @@ public class MergeTreeCompactTask extends CompactTask {
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws
Exception {
if (file.level() == outputLevel) {
- if (isContainExpiredRecords(file)
+ if (forceCompactAllFiles
+ || isContainExpiredRecords(file)
|| (dvMaintainer != null
&&
dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) {
/*
- * 1. if the large file in maxLevel has expired records, we
need to rewrite it.
- * 2. if the large file in maxLevel has corresponding deletion
vector, we need to rewrite it.
+ * 1. if files are force picked, we need to rewrite all files.
+ * 2. if the large file in maxLevel has expired records, we
need to rewrite it.
+ * 3. if the large file in maxLevel has corresponding deletion
vector, we need to rewrite it.
*/
rewriteFile(file, toUpdate);
}
@@ -139,9 +144,9 @@ public class MergeTreeCompactTask extends CompactTask {
}
if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d ==
0).orElse(false)) {
- if (isContainExpiredRecords(file)) {
- // if the file which could be directly upgraded has expired
records, we need to
- // rewrite it
+ if (forceCompactAllFiles || isContainExpiredRecords(file)) {
+ // if all files are force picked, or the file which could be
directly upgraded has
+ // expired records, we need to rewrite it
rewriteFile(file, toUpdate);
} else {
CompactResult upgradeResult = rewriter.upgrade(outputLevel,
file);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index 36201c7f7f..d8f2d9211b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -91,6 +91,7 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
dvMaintainer,
options.compactionMinFileNum(),
options.targetFileSize(false),
+ options.forceCompactAllFiles(),
files -> compactRewrite(partition, bucket, dvFactory,
files),
compactionMetrics == null
? null
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 9a65e83d06..7c0b36701f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -290,7 +290,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
- recordLevelExpire);
+ recordLevelExpire,
+ options.forceCompactAllFiles());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2b280220ee..8cb293cd8e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -652,6 +652,7 @@ public class AppendOnlyWriterTest {
null,
MIN_FILE_NUM,
targetFileSize,
+ false,
compactBefore -> {
latch.await();
return compactBefore.isEmpty()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
index cfdf38558f..2f031548a8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
@@ -207,6 +207,7 @@ public class BucketedAppendCompactManagerTest {
null,
minFileNum,
targetFileSize,
+ false,
null, // not used
null);
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
index e7c3cce01d..262acd5d4f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -123,7 +123,7 @@ public class FullCompactTaskTest {
Collection<DataFileMeta> inputs,
long targetFileSize,
BucketedAppendCompactManager.CompactRewriter rewriter) {
- super(null, inputs, targetFileSize, rewriter, null);
+ super(null, inputs, targetFileSize, false, rewriter, null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index c43b3c20c6..c6761d9c92 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -87,7 +87,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
SCHEMA,
0,
new BucketedAppendCompactManager(
- null, toCompact, null, 4, 10, null, null), //
not used
+ null, toCompact, null, 4, 10, false, null,
null), // not used
null,
false,
dataFilePathFactory,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 4af6c39800..95daf384e7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -454,7 +454,8 @@ public abstract class MergeTreeTestBase {
null,
false,
options.needLookup(),
- null);
+ null,
+ false);
}
static class MockFailResultCompactionManager extends
MergeTreeCompactManager {
@@ -478,7 +479,8 @@ public abstract class MergeTreeTestBase {
null,
false,
false,
- null);
+ null,
+ false);
}
protected CompactResult obtainCompactResult()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 4240555977..4adff94778 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -208,7 +208,8 @@ public class MergeTreeCompactManagerTest {
null,
false,
true,
- null);
+ null,
+ false);
MergeTreeCompactManager defaultManager =
new MergeTreeCompactManager(
@@ -223,7 +224,8 @@ public class MergeTreeCompactManagerTest {
null,
false,
false,
- null);
+ null,
+ false);
assertThat(lookupManager.compactNotCompleted()).isTrue();
assertThat(defaultManager.compactNotCompleted()).isFalse();
@@ -259,7 +261,8 @@ public class MergeTreeCompactManagerTest {
null,
false,
false,
- null);
+ null,
+ false);
manager.triggerCompaction(false);
manager.getCompactionResult(true);
List<LevelMinMax> outputs =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index d79d13f026..d58c09d3a8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.StreamTableScan;
@@ -35,6 +36,7 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -196,6 +198,71 @@ public class CompactProcedureITCase extends
CatalogITCaseBase {
}
}
+ @Test
+ public void testForceCompactToExternalPath() throws Exception {
+ // test for pk table
+ String tmpPath = getTempDirPath("external/" + UUID.randomUUID());
+ sql(
+ "CREATE TABLE Tpk ("
+ + " k INT,"
+ + " v INT,"
+ + " hh INT,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'bucket' = '1'"
+ + ")");
+ FileStoreTable pkTable = paimonTable("Tpk");
+
+ sql(
+ "INSERT INTO Tpk VALUES (1, 100, 15, '20221208'), (1, 100, 16,
'20221208'), (1, 100, 15, '20221209')");
+ sql(
+ "INSERT INTO Tpk VALUES (2, 100, 15, '20221208'), (2, 100, 16,
'20221208'), (2, 100, 15, '20221209')");
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ sql(
+ "CALL sys.compact(`table` => 'default.Tpk',"
+ + " options =>
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+ tmpPath);
+ List<DataSplit> splits =
pkTable.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ for (DataFileMeta meta : split.dataFiles()) {
+ assertThat(meta.externalPath().get().startsWith("file:" +
tmpPath)).isTrue();
+ }
+ }
+
+ // test for append table
+ tmpPath = getTempDirPath("external/" + UUID.randomUUID());
+ sql(
+ "CREATE TABLE Tap ("
+ + " k INT,"
+ + " v INT,"
+ + " hh INT,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'bucket' = '1',"
+ + " 'bucket-key' = 'k'"
+ + ")");
+ FileStoreTable apTable = paimonTable("Tap");
+
+ sql(
+ "INSERT INTO Tap VALUES (1, 100, 15, '20221208'), (1, 100, 16,
'20221208'), (1, 100, 15, '20221209')");
+ sql(
+ "INSERT INTO Tap VALUES (2, 100, 15, '20221208'), (2, 100, 16,
'20221208'), (2, 100, 15, '20221209')");
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ sql(
+ "CALL sys.compact(`table` => 'default.Tap',"
+ + " options =>
'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')",
+ tmpPath);
+ splits = apTable.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ for (DataFileMeta meta : split.dataFiles()) {
+ assertThat(meta.externalPath().get().startsWith("file:" +
tmpPath)).isTrue();
+ }
+ }
+ }
+
// ----------------------- Sort Compact -----------------------
@Test