This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 06e3fb94a [core] Optimize memory of compact coordinator (#4097)
06e3fb94a is described below
commit 06e3fb94a7427bddd1968df3184f279f93f356d7
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 30 10:03:29 2024 +0800
[core] Optimize memory of compact coordinator (#4097)
---
.../main/java/org/apache/paimon/CoreOptions.java | 2 -
.../UnawareAppendTableCompactionCoordinator.java | 172 ++++++++++++++-------
.../UnawareAppendDeletionFileMaintainer.java | 4 +
.../paimon/operation/AbstractFileStoreScan.java | 9 +-
.../org/apache/paimon/operation/FileStoreScan.java | 2 +-
.../paimon/table/source/AbstractDataTableScan.java | 1 -
.../paimon/table/source/DataTableStreamScan.java | 3 -
.../ContinuousAppendAndCompactFollowUpScanner.java | 54 -------
.../snapshot/FileCreationTimeStartingScanner.java | 3 +-
.../table/source/snapshot/SnapshotReader.java | 3 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 5 +-
.../apache/paimon/table/system/AuditLogTable.java | 4 +-
.../append/AppendOnlyTableCompactionTest.java | 51 ++++--
...tinuousAppendAndCompactFollowUpScannerTest.java | 149 ------------------
14 files changed, 168 insertions(+), 294 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4c69942b6..21eee2a14 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2347,8 +2347,6 @@ public class CoreOptions implements Serializable {
public enum StreamScanMode implements DescribedEnum {
NONE("none", "No requirement."),
COMPACT_BUCKET_TABLE("compact-bucket-table", "Compaction for
traditional bucket table."),
- COMPACT_APPEND_NO_BUCKET(
- "compact-append-no-bucket", "Compaction for append table with
bucket unaware."),
FILE_MONITOR("file-monitor", "Monitor data file changes.");
private final String value;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index e545a0d5c..53068cc63 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.append;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
@@ -26,12 +27,16 @@ import
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintai
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -43,8 +48,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
+
/**
* Compact coordinator for append only tables.
*
@@ -65,15 +73,16 @@ public class UnawareAppendTableCompactionCoordinator {
protected static final int REMOVE_AGE = 10;
protected static final int COMPACT_AGE = 5;
- @Nullable private final Long snapshotId;
- private final InnerTableScan scan;
+ private final SnapshotManager snapshotManager;
+ private final SnapshotReader snapshotReader;
private final long targetFileSize;
private final long compactionFileSize;
private final int minFileNum;
private final int maxFileNum;
private final boolean streamingMode;
- private final IndexFileHandler indexFileHandler;
- private final boolean deletionVectorEnabled;
+ private final DvMaintainerCache dvMaintainerCache;
+
+ @Nullable private Long nextSnapshot = null;
final Map<BinaryRow, PartitionCompactCoordinator>
partitionCompactCoordinators =
new HashMap<>();
@@ -89,25 +98,22 @@ public class UnawareAppendTableCompactionCoordinator {
public UnawareAppendTableCompactionCoordinator(
FileStoreTable table, boolean isStreaming, @Nullable Predicate
filter) {
Preconditions.checkArgument(table.primaryKeys().isEmpty());
- FileStoreTable tableCopy = table.copy(compactScanType());
- if (isStreaming) {
- scan = tableCopy.newStreamScan();
- } else {
- scan = tableCopy.newScan();
- }
+ this.snapshotManager = table.snapshotManager();
+ this.snapshotReader = table.newSnapshotReader();
if (filter != null) {
- scan.withFilter(filter);
+ snapshotReader.withFilter(filter);
}
- this.snapshotId = table.snapshotManager().latestSnapshotId();
this.streamingMode = isStreaming;
- CoreOptions coreOptions = table.coreOptions();
- this.targetFileSize = coreOptions.targetFileSize(false);
- this.compactionFileSize = coreOptions.compactionFileSize(false);
- this.minFileNum = coreOptions.compactionMinFileNum();
+ CoreOptions options = table.coreOptions();
+ this.targetFileSize = options.targetFileSize(false);
+ this.compactionFileSize = options.compactionFileSize(false);
+ this.minFileNum = options.compactionMinFileNum();
// this is global compaction, avoid too many compaction tasks
- this.maxFileNum = coreOptions.compactionMaxFileNum().orElse(50);
- this.indexFileHandler = table.store().newIndexFileHandler();
- this.deletionVectorEnabled = coreOptions.deletionVectorsEnabled();
+ this.maxFileNum = options.compactionMaxFileNum().orElse(50);
+ this.dvMaintainerCache =
+ options.deletionVectorsEnabled()
+ ? new
DvMaintainerCache(table.store().newIndexFileHandler())
+ : null;
}
public List<UnawareAppendCompactionTask> run() {
@@ -122,15 +128,11 @@ public class UnawareAppendTableCompactionCoordinator {
@VisibleForTesting
boolean scan() {
- List<Split> splits;
+ List<DataSplit> splits;
boolean hasResult = false;
- while (!(splits = scan.plan().splits()).isEmpty()) {
+ while (!(splits = plan()).isEmpty()) {
hasResult = true;
- splits.forEach(
- split -> {
- DataSplit dataSplit = (DataSplit) split;
- notifyNewFiles(dataSplit.partition(),
dataSplit.dataFiles());
- });
+ splits.forEach(split -> notifyNewFiles(split.partition(),
split.dataFiles()));
// batch mode, we don't do continuous scanning
if (!streamingMode) {
break;
@@ -140,19 +142,59 @@ public class UnawareAppendTableCompactionCoordinator {
}
@VisibleForTesting
- void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
- UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer;
- if (deletionVectorEnabled) {
- dvIndexFileMaintainer =
- AppendDeletionFileMaintainer.forUnawareAppend(
- indexFileHandler, snapshotId, partition);
+ List<DataSplit> plan() {
+ if (nextSnapshot == null) {
+ nextSnapshot = snapshotManager.latestSnapshotId();
+ if (nextSnapshot == null) {
+ return emptyList();
+ }
+ snapshotReader.withMode(ScanMode.ALL);
} else {
- dvIndexFileMaintainer = null;
+ if (!streamingMode) {
+ throw new EndOfScanException();
+ }
+ snapshotReader.withMode(ScanMode.DELTA);
+ }
+
+ if (!snapshotManager.snapshotExists(nextSnapshot)) {
+ return emptyList();
}
+
+ Snapshot snapshot = snapshotManager.snapshot(nextSnapshot);
+ nextSnapshot++;
+
+ if (dvMaintainerCache != null) {
+ dvMaintainerCache.refresh();
+ }
+ Filter<ManifestEntry> entryFilter =
+ entry -> {
+ if (entry.file().fileSize() < compactionFileSize) {
+ return true;
+ }
+
+ if (dvMaintainerCache != null) {
+ return dvMaintainerCache
+ .dvMaintainer(entry.partition())
+ .hasDeletionFile(entry.fileName());
+ }
+ return false;
+ };
+ return snapshotReader
+ .withManifestEntryFilter(entryFilter)
+ .withSnapshot(snapshot)
+ .read()
+ .dataSplits();
+ }
+
+ @VisibleForTesting
+ void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
java.util.function.Predicate<DataFileMeta> filter =
file -> {
- if (dvIndexFileMaintainer == null
- ||
dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null) {
+ if (dvMaintainerCache == null
+ || dvMaintainerCache
+ .dvMaintainer(partition)
+ .getDeletionFile(file.fileName())
+ == null) {
return file.fileSize() < compactionFileSize;
}
// if a data file has a deletion file, always be to
compact.
@@ -160,9 +202,7 @@ public class UnawareAppendTableCompactionCoordinator {
};
List<DataFileMeta> toCompact =
files.stream().filter(filter).collect(Collectors.toList());
partitionCompactCoordinators
- .computeIfAbsent(
- partition,
- pp -> new
PartitionCompactCoordinator(dvIndexFileMaintainer, partition))
+ .computeIfAbsent(partition, pp -> new
PartitionCompactCoordinator(partition))
.addFiles(toCompact);
}
@@ -196,27 +236,14 @@ public class UnawareAppendTableCompactionCoordinator {
return sets;
}
- private Map<String, String> compactScanType() {
- return new HashMap<String, String>() {
- {
- put(
- CoreOptions.STREAM_SCAN_MODE.key(),
-
CoreOptions.StreamScanMode.COMPACT_APPEND_NO_BUCKET.getValue());
- }
- };
- }
-
/** Coordinator for a single partition. */
class PartitionCompactCoordinator {
- private final UnawareAppendDeletionFileMaintainer
dvIndexFileMaintainer;
private final BinaryRow partition;
private final HashSet<DataFileMeta> toCompact = new HashSet<>();
int age = 0;
- public PartitionCompactCoordinator(
- UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer,
BinaryRow partition) {
- this.dvIndexFileMaintainer = dvIndexFileMaintainer;
+ public PartitionCompactCoordinator(BinaryRow partition) {
this.partition = partition;
}
@@ -248,7 +275,7 @@ public class UnawareAppendTableCompactionCoordinator {
private List<List<DataFileMeta>> agePack() {
List<List<DataFileMeta>> packed;
- if (dvIndexFileMaintainer == null) {
+ if (dvMaintainerCache == null) {
packed = pack(toCompact);
} else {
packed = packInDeletionVectorVMode(toCompact);
@@ -293,7 +320,8 @@ public class UnawareAppendTableCompactionCoordinator {
Map<IndexFileMeta, List<DataFileMeta>> filesWithDV = new
HashMap<>();
Set<DataFileMeta> rest = new HashSet<>();
for (DataFileMeta dataFile : toCompact) {
- IndexFileMeta indexFile =
dvIndexFileMaintainer.getIndexFile(dataFile.fileName());
+ IndexFileMeta indexFile =
+
dvMaintainerCache.dvMaintainer(partition).getIndexFile(dataFile.fileName());
if (indexFile == null) {
rest.add(dataFile);
} else {
@@ -335,4 +363,36 @@ public class UnawareAppendTableCompactionCoordinator {
}
}
}
+
+ private class DvMaintainerCache {
+
+ private final IndexFileHandler indexFileHandler;
+
+ /** Should be thread safe, ManifestEntryFilter will be invoked in many
threads. */
+ private final Map<BinaryRow, UnawareAppendDeletionFileMaintainer>
cache =
+ new ConcurrentHashMap<>();
+
+ private DvMaintainerCache(IndexFileHandler indexFileHandler) {
+ this.indexFileHandler = indexFileHandler;
+ }
+
+ private void refresh() {
+ this.cache.clear();
+ }
+
+ private UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow
partition) {
+ UnawareAppendDeletionFileMaintainer maintainer =
cache.get(partition);
+ if (maintainer == null) {
+ synchronized (this) {
+ maintainer =
+ AppendDeletionFileMaintainer.forUnawareAppend(
+ indexFileHandler,
+ snapshotManager.latestSnapshotId(),
+ partition);
+ }
+ cache.put(partition, maintainer);
+ }
+ return maintainer;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
index f764dfc53..abbdeea20 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
@@ -100,6 +100,10 @@ public class UnawareAppendDeletionFileMaintainer
implements AppendDeletionFileMa
return UNAWARE_BUCKET;
}
+ public boolean hasDeletionFile(String dataFile) {
+ return this.dataFileToDeletionFile.containsKey(dataFile);
+ }
+
public DeletionFile getDeletionFile(String dataFile) {
return this.dataFileToDeletionFile.get(dataFile);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index cfd1a4bf8..ec3d4a239 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -88,7 +88,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
- private Long dataFileTimeMills = null;
+ private Filter<ManifestEntry> manifestEntryFilter = null;
private Filter<String> fileNameFilter = null;
private ManifestCacheFilter manifestCacheFilter = null;
@@ -196,8 +196,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withDataFileTimeMills(long dataFileTimeMills) {
- this.dataFileTimeMills = dataFileTimeMills;
+ public FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter)
{
+ this.manifestEntryFilter = filter;
return this;
}
@@ -455,8 +455,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
private boolean filterUnmergedManifestEntry(ManifestEntry entry) {
- if (dataFileTimeMills != null
- && entry.file().creationTimeEpochMillis() < dataFileTimeMills)
{
+ if (manifestEntryFilter != null && !manifestEntryFilter.test(entry)) {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 744f2fe14..c7b0e8cdf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -68,7 +68,7 @@ public interface FileStoreScan {
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
- FileStoreScan withDataFileTimeMills(long dataFileTimeMills);
+ FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 37e66ccdc..ba1bc6588 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -116,7 +116,6 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
checkArgument(
isStreaming, "Set 'streaming-compact' in batch mode.
This is unexpected.");
return new ContinuousCompactorStartingScanner(snapshotManager);
- case COMPACT_APPEND_NO_BUCKET:
case FILE_MONITOR:
return new FullStartingScanner(snapshotManager);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 6c02f5746..f315bdfa9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -27,7 +27,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
-import
org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
@@ -226,8 +225,6 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
switch (type) {
case COMPACT_BUCKET_TABLE:
return new DeltaFollowUpScanner();
- case COMPACT_APPEND_NO_BUCKET:
- return new ContinuousAppendAndCompactFollowUpScanner();
case FILE_MONITOR:
return new AllDeltaFollowUpScanner();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
deleted file mode 100644
index da72e1092..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link FollowUpScanner} used internally for stand-alone streaming compact
job sources when table
- * is unaware-bucket table.
- */
-public class ContinuousAppendAndCompactFollowUpScanner implements
FollowUpScanner {
-
- private static final Logger LOG =
-
LoggerFactory.getLogger(ContinuousAppendAndCompactFollowUpScanner.class);
-
- @Override
- public boolean shouldScanSnapshot(Snapshot snapshot) {
- if (snapshot.commitKind() == Snapshot.CommitKind.APPEND
- || snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
- return true;
- }
-
- LOG.debug(
- "Next snapshot id {} is not APPEND or COMPACT, but is {},
check next one.",
- snapshot.id(),
- snapshot.commitKind());
- return false;
- }
-
- @Override
- public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader
snapshotReader) {
- return
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshot).read();
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
index a3a87bef2..0ff55152a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
@@ -57,7 +57,8 @@ public class FileCreationTimeStartingScanner extends
AbstractStartingScanner {
snapshotReader
.withMode(ScanMode.ALL)
.withSnapshot(startingSnapshotId)
- .withDataFileTimeMills(startupMillis)
+ .withManifestEntryFilter(
+ entry ->
entry.file().creationTimeEpochMillis() >= startupMillis)
.read());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index c2439de55..96605ceeb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
@@ -62,7 +63,7 @@ public interface SnapshotReader {
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
- SnapshotReader withDataFileTimeMills(long dataFileTimeMills);
+ SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter);
SnapshotReader withBucket(int bucket);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 25c3ffa96..ef3523dfd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.DefaultValueAssigner;
@@ -207,8 +208,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
}
@Override
- public SnapshotReader withDataFileTimeMills(long dataFileTimeMills) {
- scan.withDataFileTimeMills(dataFileTimeMills);
+ public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry>
filter) {
+ scan.withManifestEntryFilter(filter);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index b78444625..50028d890 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -301,8 +301,8 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public SnapshotReader withDataFileTimeMills(long dataFileTimeMills) {
- snapshotReader.withDataFileTimeMills(dataFileTimeMills);
+ public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry>
filter) {
+ snapshotReader.withManifestEntryFilter(filter);
return this;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
index 9f57e3507..ed513626a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
@@ -47,18 +47,43 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
+import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for append table compaction. */
public class AppendOnlyTableCompactionTest {
+ private static final Random random = new Random();
+
@TempDir private Path tempDir;
private FileStoreTable appendOnlyFileStoreTable;
private SnapshotManager snapshotManager;
private UnawareAppendTableCompactionCoordinator compactionCoordinator;
private AppendOnlyFileStoreWrite write;
+ private org.apache.paimon.fs.Path path;
+ private TableSchema tableSchema;
private final String commitUser = UUID.randomUUID().toString();
+ @BeforeEach
+ public void createNegativeAppendOnlyTable() throws Exception {
+ FileIO fileIO = new LocalFileIO();
+ path = new org.apache.paimon.fs.Path(tempDir.toString());
+ tableSchema = new SchemaManager(fileIO, path).createTable(schema());
+ snapshotManager = new SnapshotManager(fileIO, path);
+ recreate();
+ }
+
+ private void recreate() {
+ appendOnlyFileStoreTable =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new org.apache.paimon.fs.Path(tempDir.toString()),
+ tableSchema);
+ compactionCoordinator =
+ new
UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable);
+ write = (AppendOnlyFileStoreWrite)
appendOnlyFileStoreTable.store().newWrite(commitUser);
+ }
+
@Test
public void noCompaction() throws Exception {
List<CommitMessage> messages = writeCommit(10);
@@ -100,6 +125,15 @@ public class AppendOnlyTableCompactionTest {
assertThat(last.get(0).rowCount()).isEqualTo(11);
}
+ @Test
+ public void testScanSkipBigFiles() throws Exception {
+ List<CommitMessage> messages = writeCommit(11);
+ commit(messages);
+ tableSchema = tableSchema.copy(singletonMap("target-file-size", "1
b"));
+ recreate();
+ assertThat(compactionCoordinator.plan()).isEmpty();
+ }
+
@Test
public void testCompactionLot() throws Exception {
// test continuous compaction
@@ -178,21 +212,4 @@ public class AppendOnlyTableCompactionTest {
BinaryString.fromString("B" + random.nextInt(100)),
BinaryString.fromString("C" + random.nextInt(100)));
}
-
- private static final Random random = new Random();
-
- @BeforeEach
- public void createNegativeAppendOnlyTable() throws Exception {
- FileIO fileIO = new LocalFileIO();
- org.apache.paimon.fs.Path path = new
org.apache.paimon.fs.Path(tempDir.toString());
- SchemaManager schemaManager = new SchemaManager(fileIO, path);
- TableSchema tableSchema = schemaManager.createTable(schema());
- snapshotManager = new SnapshotManager(fileIO, path);
- appendOnlyFileStoreTable =
- FileStoreTableFactory.create(
- fileIO, new
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
- compactionCoordinator =
- new
UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable);
- write = (AppendOnlyFileStoreWrite)
appendOnlyFileStoreTable.store().newWrite(commitUser);
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
deleted file mode 100644
index c32e4fad0..000000000
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.system.BucketsTable;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link ContinuousAppendAndCompactFollowUpScanner}. */
-public class ContinuousAppendAndCompactFollowUpScannerTest extends
ScannerTestBase {
-
- private static final RowType ROW_TYPE =
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
- new String[] {"a", "b", "c"});
-
- private final DataFileMetaSerializer dataFileMetaSerializer = new
DataFileMetaSerializer();
-
- @Test
- public void testScan() throws Exception {
- SnapshotManager snapshotManager = table.snapshotManager();
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
-
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(2, 40, 400L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- List<CommitMessage> messageList = new ArrayList<>();
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- write.write(rowData(2, 40, 400L));
- messageList.addAll(write.prepareCommit(true, 1));
- write.write(rowData(1, 10, 100L));
- write.write(rowData(1, 20, 200L));
- messageList.addAll(write.prepareCommit(true, 1));
- commit.commit(1, messageList);
-
- write.compact(binaryRow(1), 0, true);
- commit.commit(2, write.prepareCommit(true, 2));
-
- write.close();
- commit.close();
-
- BucketsTable bucketsTable = new BucketsTable(table, true);
- TableRead read = bucketsTable.newRead();
- ContinuousAppendAndCompactFollowUpScanner scanner =
- new ContinuousAppendAndCompactFollowUpScanner();
-
- Snapshot snapshot = snapshotManager.snapshot(1);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
- assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- TableScan.Plan plan = scanner.scan(snapshot, snapshotReader);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
-
- snapshot = snapshotManager.snapshot(2);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
- assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.scan(snapshot, snapshotReader);
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 2|1|0|2", "+I 2|2|0|1"));
-
- snapshot = snapshotManager.snapshot(3);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
- assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
- plan = scanner.scan(snapshot, snapshotReader);
- assertThat(getResult(read,
plan.splits())).hasSameElementsAs(Arrays.asList("+I 3|1|0|1"));
- }
-
- @Override
- protected String rowDataToString(InternalRow rowData) {
- int numFiles;
- try {
- numFiles =
dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- return String.format(
- "%s %d|%d|%d|%d",
- rowData.getRowKind().shortString(),
- rowData.getLong(0),
- deserializeBinaryRow(rowData.getBinary(1)).getInt(0),
- rowData.getInt(2),
- numFiles);
- }
-
- @Override
- protected FileStoreTable createFileStoreTable(Options conf) throws
Exception {
- SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
- TableSchema tableSchema =
- schemaManager.createTable(
- new Schema(
- ROW_TYPE.getFields(),
- Collections.singletonList("a"),
- Collections.emptyList(),
- conf.toMap(),
- ""));
- return FileStoreTableFactory.create(
- fileIO, tablePath, tableSchema, conf,
CatalogEnvironment.empty());
- }
-}