This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a44ae965784106f293320979c282c9c9661d74fb Author: Jiao Mingye <[email protected]> AuthorDate: Sun Jun 15 09:38:27 2025 +0800 [core] Support dedicated full compact to external paths (#5674) --- .../shortcodes/generated/core_configuration.html | 6 ++ .../main/java/org/apache/paimon/CoreOptions.java | 11 ++++ .../append/BucketedAppendCompactManager.java | 19 ++++-- .../paimon/mergetree/compact/CompactStrategy.java | 8 ++- .../mergetree/compact/MergeTreeCompactManager.java | 14 ++++- .../mergetree/compact/MergeTreeCompactTask.java | 19 +++--- .../operation/BucketedAppendFileStoreWrite.java | 1 + .../paimon/operation/KeyValueFileStoreWrite.java | 3 +- .../apache/paimon/append/AppendOnlyWriterTest.java | 1 + .../append/BucketedAppendCompactManagerTest.java | 1 + .../apache/paimon/append/FullCompactTaskTest.java | 2 +- .../apache/paimon/format/FileFormatSuffixTest.java | 2 +- .../apache/paimon/mergetree/MergeTreeTestBase.java | 6 +- .../compact/MergeTreeCompactManagerTest.java | 9 ++- .../flink/procedure/CompactProcedureITCase.java | 67 ++++++++++++++++++++++ 15 files changed, 144 insertions(+), 25 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 8cd61f61d6..6bee1104a1 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -182,6 +182,12 @@ under the License. <td>Double</td> <td>Ratio of the deleted rows in a data file to be forced compacted for append-only table.</td> </tr> + <tr> + <td><h5>compaction.force-compact-all-files</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to force pick all files for a full compaction. Usually seen in a compaction task to external paths.</td> + </tr> <tr> <td><h5>compaction.force-up-level-0</h5></td> <td style="word-wrap: break-word;">false</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 1df7949acc..47967be549 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -151,6 +151,13 @@ public class CoreOptions implements Serializable { + ExternalPathStrategy.SPECIFIC_FS + ", should be the prefix scheme of the external path, now supported are s3 and oss."); + public static final ConfigOption<Boolean> COMPACTION_FORCE_COMPACT_ALL_FILES = + key("compaction.force-compact-all-files") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to force pick all files for a full compaction. Usually seen in a compaction task to external paths."); + @ExcludeFromDocumentation("Internal use only") public static final ConfigOption<String> PATH = key("path") @@ -2466,6 +2473,10 @@ public class CoreOptions implements Serializable { return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS); } + public Boolean forceCompactAllFiles() { + return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES); + } + public String partitionTimestampFormatter() { return options.get(PARTITION_TIMESTAMP_FORMATTER); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java index d5c47c3b88..520fd8edb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java @@ -60,6 +60,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { private final PriorityQueue<DataFileMeta> toCompact; private final int minFileNum; private final long targetFileSize; + private final boolean forceCompactAllFiles; private final CompactRewriter rewriter; private List<DataFileMeta> compacting; @@ -72,6 +73,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { @Nullable DeletionVectorsMaintainer dvMaintainer, int minFileNum, long targetFileSize, + boolean forceCompactAllFiles, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { this.executor = executor; @@ -80,6 +82,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { this.toCompact.addAll(restored); this.minFileNum = minFileNum; this.targetFileSize = targetFileSize; + this.forceCompactAllFiles = forceCompactAllFiles; this.rewriter = rewriter; this.metricsReporter = metricsReporter; } @@ -98,9 +101,10 @@ public class BucketedAppendCompactManager extends CompactFutureManager { taskFuture == null, "A compaction task is still running while the user " + "forces a new compaction. This is unexpected."); - // if deletion vector enables, always trigger compaction. - if (toCompact.isEmpty() - || (dvMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE)) { + // if all files are force picked or deletion vector enables, always trigger compaction. + if (!forceCompactAllFiles + && (toCompact.isEmpty() + || (dvMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE))) { return; } @@ -114,6 +118,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { dvMaintainer, toCompact, targetFileSize, + forceCompactAllFiles, rewriter, metricsReporter)); recordCompactionsQueuedRequest(); @@ -238,25 +243,28 @@ public class BucketedAppendCompactManager extends CompactFutureManager { private final DeletionVectorsMaintainer dvMaintainer; private final LinkedList<DataFileMeta> toCompact; private final long targetFileSize; + private final boolean forceCompactAllFiles; private final CompactRewriter rewriter; public FullCompactTask( DeletionVectorsMaintainer dvMaintainer, Collection<DataFileMeta> inputs, long targetFileSize, + boolean forceCompactAllFiles, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { super(metricsReporter); this.dvMaintainer = dvMaintainer; this.toCompact = new LinkedList<>(inputs); this.targetFileSize = targetFileSize; + this.forceCompactAllFiles = forceCompactAllFiles; this.rewriter = rewriter; } @Override protected CompactResult doCompact() throws Exception { // remove large files - while (!toCompact.isEmpty()) { + while (!forceCompactAllFiles && !toCompact.isEmpty()) { DataFileMeta file = toCompact.peekFirst(); // the data file with deletion file always need to be compacted. if (file.fileSize() >= targetFileSize && !hasDeletionFile(file)) { @@ -281,7 +289,8 @@ public class BucketedAppendCompactManager extends CompactFutureManager { small++; } } - if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) { + if (forceCompactAllFiles + || (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE)) { return compact(null, toCompact, rewriter); } else { return result(emptyList(), emptyList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java index ec82e9e530..ef56196e0a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java @@ -55,7 +55,8 @@ public interface CompactStrategy { int numLevels, List<LevelSortedRun> runs, @Nullable RecordLevelExpire recordLevelExpire, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable DeletionVectorsMaintainer dvMaintainer, + boolean forceCompactAllFiles) { int maxLevel = numLevels - 1; if (runs.isEmpty()) { // no sorted run, no need to compact @@ -64,7 +65,10 @@ public interface CompactStrategy { List<DataFileMeta> filesToBeCompacted = new ArrayList<>(); for (DataFileMeta file : runs.get(0).run().files()) { - if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) { + if (forceCompactAllFiles) { + // add all files when force compacted + filesToBeCompacted.add(file); + } else if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) { // check record level expire for large files filesToBeCompacted.add(file); } else if (dvMaintainer != null diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index a4852e7346..953a235391 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { @Nullable private final DeletionVectorsMaintainer dvMaintainer; private final boolean lazyGenDeletionFile; private final boolean needLookup; + private final boolean forceCompactAllFiles; @Nullable private final RecordLevelExpire recordLevelExpire; @@ -80,7 +81,8 @@ public class MergeTreeCompactManager extends CompactFutureManager { @Nullable DeletionVectorsMaintainer dvMaintainer, boolean lazyGenDeletionFile, boolean needLookup, - @Nullable RecordLevelExpire recordLevelExpire) { + @Nullable RecordLevelExpire recordLevelExpire, + boolean forceCompactAllFiles) { this.executor = executor; this.levels = levels; this.strategy = strategy; @@ -93,6 +95,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { this.lazyGenDeletionFile = lazyGenDeletionFile; this.recordLevelExpire = recordLevelExpire; this.needLookup = needLookup; + this.forceCompactAllFiles = forceCompactAllFiles; MetricUtils.safeCall(this::reportMetrics, LOG); } @@ -135,7 +138,11 @@ public class MergeTreeCompactManager extends CompactFutureManager { } optionalUnit = CompactStrategy.pickFullCompaction( - levels.numberOfLevels(), runs, recordLevelExpire, dvMaintainer); + levels.numberOfLevels(), + runs, + recordLevelExpire, + dvMaintainer, + forceCompactAllFiles); } else { if (taskFuture != null) { return; @@ -210,7 +217,8 @@ public class MergeTreeCompactManager extends CompactFutureManager { metricsReporter, compactDfSupplier, dvMaintainer, - recordLevelExpire); + recordLevelExpire, + forceCompactAllFiles); if (LOG.isDebugEnabled()) { LOG.debug( "Pick these files (name, level, size) for compaction: {}", diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index 0ce9fbb4c2..4aa10e9b79 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -55,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask { private int upgradeFilesNum; @Nullable private final RecordLevelExpire recordLevelExpire; + private final boolean forceCompactAllFiles; @Nullable private final DeletionVectorsMaintainer dvMaintainer; public MergeTreeCompactTask( @@ -67,7 +68,8 @@ public class MergeTreeCompactTask extends CompactTask { @Nullable CompactionMetrics.Reporter metricsReporter, Supplier<CompactDeletionFile> compactDfSupplier, @Nullable DeletionVectorsMaintainer dvMaintainer, - @Nullable RecordLevelExpire recordLevelExpire) { + @Nullable RecordLevelExpire recordLevelExpire, + boolean forceCompactAllFiles) { super(metricsReporter); this.minFileSize = minFileSize; this.rewriter = rewriter; @@ -78,6 +80,7 @@ public class MergeTreeCompactTask extends CompactTask { this.dropDelete = dropDelete; this.maxLevel = maxLevel; this.recordLevelExpire = recordLevelExpire; + this.forceCompactAllFiles = forceCompactAllFiles; this.upgradeFilesNum = 0; } @@ -126,12 +129,14 @@ public class MergeTreeCompactTask extends CompactTask { private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception { if (file.level() == outputLevel) { - if (isContainExpiredRecords(file) + if (forceCompactAllFiles + || isContainExpiredRecords(file) || (dvMaintainer != null && dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) { /* - * 1. if the large file in maxLevel has expired records, we need to rewrite it. - * 2. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it. + * 1. if files are force picked, we need to rewrite all files. + * 2. if the large file in maxLevel has expired records, we need to rewrite it. + * 3. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it. */ rewriteFile(file, toUpdate); } @@ -139,9 +144,9 @@ public class MergeTreeCompactTask extends CompactTask { } if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) { - if (isContainExpiredRecords(file)) { - // if the file which could be directly upgraded has expired records, we need to - // rewrite it + if (forceCompactAllFiles || isContainExpiredRecords(file)) { + // if all files are force picked, or the file which could be directly upgraded has + // expired records, we need to rewrite it rewriteFile(file, toUpdate); } else { CompactResult upgradeResult = rewriter.upgrade(outputLevel, file); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index 36201c7f7f..d8f2d9211b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -91,6 +91,7 @@ public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite { dvMaintainer, options.compactionMinFileNum(), options.targetFileSize(false), + options.forceCompactAllFiles(), files -> compactRewrite(partition, bucket, dvFactory, files), compactionMetrics == null ? null diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 9a65e83d06..7c0b36701f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -290,7 +290,8 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { dvMaintainer, options.prepareCommitWaitCompaction(), options.needLookup(), - recordLevelExpire); + recordLevelExpire, + options.forceCompactAllFiles()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 2b280220ee..8cb293cd8e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -652,6 +652,7 @@ public class AppendOnlyWriterTest { null, MIN_FILE_NUM, targetFileSize, + false, compactBefore -> { latch.await(); return compactBefore.isEmpty() diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java index cfdf38558f..2f031548a8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java @@ -207,6 +207,7 @@ public class BucketedAppendCompactManagerTest { null, minFileNum, targetFileSize, + false, null, // not used null); Optional<List<DataFileMeta>> actual = manager.pickCompactBefore(); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java index e7c3cce01d..262acd5d4f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java @@ -123,7 +123,7 @@ public class FullCompactTaskTest { Collection<DataFileMeta> inputs, long targetFileSize, BucketedAppendCompactManager.CompactRewriter rewriter) { - super(null, inputs, targetFileSize, rewriter, null); + super(null, inputs, targetFileSize, false, rewriter, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index c43b3c20c6..c6761d9c92 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -87,7 +87,7 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest { SCHEMA, 0, new BucketedAppendCompactManager( - null, toCompact, null, 4, 10, null, null), // not used + null, toCompact, null, 4, 10, false, null, null), // not used null, false, dataFilePathFactory, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 4af6c39800..95daf384e7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -454,7 +454,8 @@ public abstract class MergeTreeTestBase { null, false, options.needLookup(), - null); + null, + false); } static class MockFailResultCompactionManager extends MergeTreeCompactManager { @@ -478,7 +479,8 @@ public abstract class MergeTreeTestBase { null, false, false, - null); + null, + false); } protected CompactResult obtainCompactResult() diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java index 4240555977..4adff94778 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java @@ -208,7 +208,8 @@ public class MergeTreeCompactManagerTest { null, false, true, - null); + null, + false); MergeTreeCompactManager defaultManager = new MergeTreeCompactManager( @@ -223,7 +224,8 @@ public class MergeTreeCompactManagerTest { null, false, false, - null); + null, + false); assertThat(lookupManager.compactNotCompleted()).isTrue(); assertThat(defaultManager.compactNotCompleted()).isFalse(); @@ -259,7 +261,8 @@ public class MergeTreeCompactManagerTest { null, false, false, - null); + null, + false); manager.triggerCompaction(false); manager.getCompactionResult(true); List<LevelMinMax> outputs = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java index d79d13f026..d58c09d3a8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java @@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.Snapshot; import org.apache.paimon.flink.CatalogITCaseBase; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.StreamTableScan; @@ -35,6 +36,7 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -196,6 +198,71 @@ public class CompactProcedureITCase extends CatalogITCaseBase { } } + @Test + public void testForceCompactToExternalPath() throws Exception { + // test for pk table + String tmpPath = getTempDirPath("external/" + UUID.randomUUID()); + sql( + "CREATE TABLE Tpk (" + + " k INT," + + " v INT," + + " hh INT," + + " dt STRING," + + " PRIMARY KEY (k, dt, hh) NOT ENFORCED" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'bucket' = '1'" + + ")"); + FileStoreTable pkTable = paimonTable("Tpk"); + + sql( + "INSERT INTO Tpk VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')"); + sql( + "INSERT INTO Tpk VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208'), (2, 100, 15, '20221209')"); + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); + sql( + "CALL sys.compact(`table` => 'default.Tpk'," + + " options => 'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')", + tmpPath); + List<DataSplit> splits = pkTable.newSnapshotReader().read().dataSplits(); + for (DataSplit split : splits) { + for (DataFileMeta meta : split.dataFiles()) { + assertThat(meta.externalPath().get().startsWith("file:" + tmpPath)).isTrue(); + } + } + + // test for append table + tmpPath = getTempDirPath("external/" + UUID.randomUUID()); + sql( + "CREATE TABLE Tap (" + + " k INT," + + " v INT," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'bucket' = '1'," + + " 'bucket-key' = 'k'" + + ")"); + FileStoreTable apTable = paimonTable("Tap"); + + sql( + "INSERT INTO Tap VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')"); + sql( + "INSERT INTO Tap VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208'), (2, 100, 15, '20221209')"); + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); + sql( + "CALL sys.compact(`table` => 'default.Tap'," + + " options => 'compaction.force-compact-all-files=true,data-file.external-paths=file://%s,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=file')", + tmpPath); + splits = apTable.newSnapshotReader().read().dataSplits(); + for (DataSplit split : splits) { + for (DataFileMeta meta : split.dataFiles()) { + assertThat(meta.externalPath().get().startsWith("file:" + tmpPath)).isTrue(); + } + } + } + // ----------------------- Sort Compact ----------------------- @Test
