This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 06e3fb94a [core] Optimize memory of compact coordinator (#4097)
06e3fb94a is described below

commit 06e3fb94a7427bddd1968df3184f279f93f356d7
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 30 10:03:29 2024 +0800

    [core] Optimize memory of compact coordinator (#4097)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   2 -
 .../UnawareAppendTableCompactionCoordinator.java   | 172 ++++++++++++++-------
 .../UnawareAppendDeletionFileMaintainer.java       |   4 +
 .../paimon/operation/AbstractFileStoreScan.java    |   9 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +-
 .../paimon/table/source/AbstractDataTableScan.java |   1 -
 .../paimon/table/source/DataTableStreamScan.java   |   3 -
 .../ContinuousAppendAndCompactFollowUpScanner.java |  54 -------
 .../snapshot/FileCreationTimeStartingScanner.java  |   3 +-
 .../table/source/snapshot/SnapshotReader.java      |   3 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |   5 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   4 +-
 .../append/AppendOnlyTableCompactionTest.java      |  51 ++++--
 ...tinuousAppendAndCompactFollowUpScannerTest.java | 149 ------------------
 14 files changed, 168 insertions(+), 294 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4c69942b6..21eee2a14 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2347,8 +2347,6 @@ public class CoreOptions implements Serializable {
     public enum StreamScanMode implements DescribedEnum {
         NONE("none", "No requirement."),
         COMPACT_BUCKET_TABLE("compact-bucket-table", "Compaction for 
traditional bucket table."),
-        COMPACT_APPEND_NO_BUCKET(
-                "compact-append-no-bucket", "Compaction for append table with 
bucket unaware."),
         FILE_MONITOR("file-monitor", "Monitor data file changes.");
 
         private final String value;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index e545a0d5c..53068cc63 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.append;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
@@ -26,12 +27,16 @@ import 
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintai
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
@@ -43,8 +48,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
+
 /**
  * Compact coordinator for append only tables.
  *
@@ -65,15 +73,16 @@ public class UnawareAppendTableCompactionCoordinator {
     protected static final int REMOVE_AGE = 10;
     protected static final int COMPACT_AGE = 5;
 
-    @Nullable private final Long snapshotId;
-    private final InnerTableScan scan;
+    private final SnapshotManager snapshotManager;
+    private final SnapshotReader snapshotReader;
     private final long targetFileSize;
     private final long compactionFileSize;
     private final int minFileNum;
     private final int maxFileNum;
     private final boolean streamingMode;
-    private final IndexFileHandler indexFileHandler;
-    private final boolean deletionVectorEnabled;
+    private final DvMaintainerCache dvMaintainerCache;
+
+    @Nullable private Long nextSnapshot = null;
 
     final Map<BinaryRow, PartitionCompactCoordinator> 
partitionCompactCoordinators =
             new HashMap<>();
@@ -89,25 +98,22 @@ public class UnawareAppendTableCompactionCoordinator {
     public UnawareAppendTableCompactionCoordinator(
             FileStoreTable table, boolean isStreaming, @Nullable Predicate 
filter) {
         Preconditions.checkArgument(table.primaryKeys().isEmpty());
-        FileStoreTable tableCopy = table.copy(compactScanType());
-        if (isStreaming) {
-            scan = tableCopy.newStreamScan();
-        } else {
-            scan = tableCopy.newScan();
-        }
+        this.snapshotManager = table.snapshotManager();
+        this.snapshotReader = table.newSnapshotReader();
         if (filter != null) {
-            scan.withFilter(filter);
+            snapshotReader.withFilter(filter);
         }
-        this.snapshotId = table.snapshotManager().latestSnapshotId();
         this.streamingMode = isStreaming;
-        CoreOptions coreOptions = table.coreOptions();
-        this.targetFileSize = coreOptions.targetFileSize(false);
-        this.compactionFileSize = coreOptions.compactionFileSize(false);
-        this.minFileNum = coreOptions.compactionMinFileNum();
+        CoreOptions options = table.coreOptions();
+        this.targetFileSize = options.targetFileSize(false);
+        this.compactionFileSize = options.compactionFileSize(false);
+        this.minFileNum = options.compactionMinFileNum();
         // this is global compaction, avoid too many compaction tasks
-        this.maxFileNum = coreOptions.compactionMaxFileNum().orElse(50);
-        this.indexFileHandler = table.store().newIndexFileHandler();
-        this.deletionVectorEnabled = coreOptions.deletionVectorsEnabled();
+        this.maxFileNum = options.compactionMaxFileNum().orElse(50);
+        this.dvMaintainerCache =
+                options.deletionVectorsEnabled()
+                        ? new 
DvMaintainerCache(table.store().newIndexFileHandler())
+                        : null;
     }
 
     public List<UnawareAppendCompactionTask> run() {
@@ -122,15 +128,11 @@ public class UnawareAppendTableCompactionCoordinator {
 
     @VisibleForTesting
     boolean scan() {
-        List<Split> splits;
+        List<DataSplit> splits;
         boolean hasResult = false;
-        while (!(splits = scan.plan().splits()).isEmpty()) {
+        while (!(splits = plan()).isEmpty()) {
             hasResult = true;
-            splits.forEach(
-                    split -> {
-                        DataSplit dataSplit = (DataSplit) split;
-                        notifyNewFiles(dataSplit.partition(), 
dataSplit.dataFiles());
-                    });
+            splits.forEach(split -> notifyNewFiles(split.partition(), 
split.dataFiles()));
             // batch mode, we don't do continuous scanning
             if (!streamingMode) {
                 break;
@@ -140,19 +142,59 @@ public class UnawareAppendTableCompactionCoordinator {
     }
 
     @VisibleForTesting
-    void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
-        UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer;
-        if (deletionVectorEnabled) {
-            dvIndexFileMaintainer =
-                    AppendDeletionFileMaintainer.forUnawareAppend(
-                            indexFileHandler, snapshotId, partition);
+    List<DataSplit> plan() {
+        if (nextSnapshot == null) {
+            nextSnapshot = snapshotManager.latestSnapshotId();
+            if (nextSnapshot == null) {
+                return emptyList();
+            }
+            snapshotReader.withMode(ScanMode.ALL);
         } else {
-            dvIndexFileMaintainer = null;
+            if (!streamingMode) {
+                throw new EndOfScanException();
+            }
+            snapshotReader.withMode(ScanMode.DELTA);
+        }
+
+        if (!snapshotManager.snapshotExists(nextSnapshot)) {
+            return emptyList();
         }
+
+        Snapshot snapshot = snapshotManager.snapshot(nextSnapshot);
+        nextSnapshot++;
+
+        if (dvMaintainerCache != null) {
+            dvMaintainerCache.refresh();
+        }
+        Filter<ManifestEntry> entryFilter =
+                entry -> {
+                    if (entry.file().fileSize() < compactionFileSize) {
+                        return true;
+                    }
+
+                    if (dvMaintainerCache != null) {
+                        return dvMaintainerCache
+                                .dvMaintainer(entry.partition())
+                                .hasDeletionFile(entry.fileName());
+                    }
+                    return false;
+                };
+        return snapshotReader
+                .withManifestEntryFilter(entryFilter)
+                .withSnapshot(snapshot)
+                .read()
+                .dataSplits();
+    }
+
+    @VisibleForTesting
+    void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
         java.util.function.Predicate<DataFileMeta> filter =
                 file -> {
-                    if (dvIndexFileMaintainer == null
-                            || 
dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null) {
+                    if (dvMaintainerCache == null
+                            || dvMaintainerCache
+                                            .dvMaintainer(partition)
+                                            .getDeletionFile(file.fileName())
+                                    == null) {
                         return file.fileSize() < compactionFileSize;
                     }
                     // if a data file has a deletion file, always be to 
compact.
@@ -160,9 +202,7 @@ public class UnawareAppendTableCompactionCoordinator {
                 };
         List<DataFileMeta> toCompact = 
files.stream().filter(filter).collect(Collectors.toList());
         partitionCompactCoordinators
-                .computeIfAbsent(
-                        partition,
-                        pp -> new 
PartitionCompactCoordinator(dvIndexFileMaintainer, partition))
+                .computeIfAbsent(partition, pp -> new 
PartitionCompactCoordinator(partition))
                 .addFiles(toCompact);
     }
 
@@ -196,27 +236,14 @@ public class UnawareAppendTableCompactionCoordinator {
         return sets;
     }
 
-    private Map<String, String> compactScanType() {
-        return new HashMap<String, String>() {
-            {
-                put(
-                        CoreOptions.STREAM_SCAN_MODE.key(),
-                        
CoreOptions.StreamScanMode.COMPACT_APPEND_NO_BUCKET.getValue());
-            }
-        };
-    }
-
     /** Coordinator for a single partition. */
     class PartitionCompactCoordinator {
 
-        private final UnawareAppendDeletionFileMaintainer 
dvIndexFileMaintainer;
         private final BinaryRow partition;
         private final HashSet<DataFileMeta> toCompact = new HashSet<>();
         int age = 0;
 
-        public PartitionCompactCoordinator(
-                UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer, 
BinaryRow partition) {
-            this.dvIndexFileMaintainer = dvIndexFileMaintainer;
+        public PartitionCompactCoordinator(BinaryRow partition) {
             this.partition = partition;
         }
 
@@ -248,7 +275,7 @@ public class UnawareAppendTableCompactionCoordinator {
 
         private List<List<DataFileMeta>> agePack() {
             List<List<DataFileMeta>> packed;
-            if (dvIndexFileMaintainer == null) {
+            if (dvMaintainerCache == null) {
                 packed = pack(toCompact);
             } else {
                 packed = packInDeletionVectorVMode(toCompact);
@@ -293,7 +320,8 @@ public class UnawareAppendTableCompactionCoordinator {
             Map<IndexFileMeta, List<DataFileMeta>> filesWithDV = new 
HashMap<>();
             Set<DataFileMeta> rest = new HashSet<>();
             for (DataFileMeta dataFile : toCompact) {
-                IndexFileMeta indexFile = 
dvIndexFileMaintainer.getIndexFile(dataFile.fileName());
+                IndexFileMeta indexFile =
+                        
dvMaintainerCache.dvMaintainer(partition).getIndexFile(dataFile.fileName());
                 if (indexFile == null) {
                     rest.add(dataFile);
                 } else {
@@ -335,4 +363,36 @@ public class UnawareAppendTableCompactionCoordinator {
             }
         }
     }
+
+    private class DvMaintainerCache {
+
+        private final IndexFileHandler indexFileHandler;
+
+        /** Should be thread safe, ManifestEntryFilter will be invoked in many 
threads. */
+        private final Map<BinaryRow, UnawareAppendDeletionFileMaintainer> 
cache =
+                new ConcurrentHashMap<>();
+
+        private DvMaintainerCache(IndexFileHandler indexFileHandler) {
+            this.indexFileHandler = indexFileHandler;
+        }
+
+        private void refresh() {
+            this.cache.clear();
+        }
+
+        private UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow 
partition) {
+            UnawareAppendDeletionFileMaintainer maintainer = 
cache.get(partition);
+            if (maintainer == null) {
+                synchronized (this) {
+                    maintainer =
+                            AppendDeletionFileMaintainer.forUnawareAppend(
+                                    indexFileHandler,
+                                    snapshotManager.latestSnapshotId(),
+                                    partition);
+                }
+                cache.put(partition, maintainer);
+            }
+            return maintainer;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
index f764dfc53..abbdeea20 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
@@ -100,6 +100,10 @@ public class UnawareAppendDeletionFileMaintainer 
implements AppendDeletionFileMa
         return UNAWARE_BUCKET;
     }
 
+    public boolean hasDeletionFile(String dataFile) {
+        return this.dataFileToDeletionFile.containsKey(dataFile);
+    }
+
     public DeletionFile getDeletionFile(String dataFile) {
         return this.dataFileToDeletionFile.get(dataFile);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index cfd1a4bf8..ec3d4a239 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -88,7 +88,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private List<ManifestFileMeta> specifiedManifests = null;
     protected ScanMode scanMode = ScanMode.ALL;
     private Filter<Integer> levelFilter = null;
-    private Long dataFileTimeMills = null;
+    private Filter<ManifestEntry> manifestEntryFilter = null;
     private Filter<String> fileNameFilter = null;
 
     private ManifestCacheFilter manifestCacheFilter = null;
@@ -196,8 +196,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     @Override
-    public FileStoreScan withDataFileTimeMills(long dataFileTimeMills) {
-        this.dataFileTimeMills = dataFileTimeMills;
+    public FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter) 
{
+        this.manifestEntryFilter = filter;
         return this;
     }
 
@@ -455,8 +455,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     /** Note: Keep this thread-safe. */
     private boolean filterUnmergedManifestEntry(ManifestEntry entry) {
-        if (dataFileTimeMills != null
-                && entry.file().creationTimeEpochMillis() < dataFileTimeMills) 
{
+        if (manifestEntryFilter != null && !manifestEntryFilter.test(entry)) {
             return false;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 744f2fe14..c7b0e8cdf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -68,7 +68,7 @@ public interface FileStoreScan {
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
-    FileStoreScan withDataFileTimeMills(long dataFileTimeMills);
+    FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
 
     FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 37e66ccdc..ba1bc6588 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -116,7 +116,6 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
                 checkArgument(
                         isStreaming, "Set 'streaming-compact' in batch mode. 
This is unexpected.");
                 return new ContinuousCompactorStartingScanner(snapshotManager);
-            case COMPACT_APPEND_NO_BUCKET:
             case FILE_MONITOR:
                 return new FullStartingScanner(snapshotManager);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 6c02f5746..f315bdfa9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -27,7 +27,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
-import 
org.apache.paimon.table.source.snapshot.ContinuousAppendAndCompactFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
@@ -226,8 +225,6 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
         switch (type) {
             case COMPACT_BUCKET_TABLE:
                 return new DeltaFollowUpScanner();
-            case COMPACT_APPEND_NO_BUCKET:
-                return new ContinuousAppendAndCompactFollowUpScanner();
             case FILE_MONITOR:
                 return new AllDeltaFollowUpScanner();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
deleted file mode 100644
index da72e1092..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScanner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link FollowUpScanner} used internally for stand-alone streaming compact 
job sources when table
- * is unaware-bucket table.
- */
-public class ContinuousAppendAndCompactFollowUpScanner implements 
FollowUpScanner {
-
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(ContinuousAppendAndCompactFollowUpScanner.class);
-
-    @Override
-    public boolean shouldScanSnapshot(Snapshot snapshot) {
-        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND
-                || snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
-            return true;
-        }
-
-        LOG.debug(
-                "Next snapshot id {} is not APPEND or COMPACT, but is {}, 
check next one.",
-                snapshot.id(),
-                snapshot.commitKind());
-        return false;
-    }
-
-    @Override
-    public SnapshotReader.Plan scan(Snapshot snapshot, SnapshotReader 
snapshotReader) {
-        return 
snapshotReader.withMode(ScanMode.DELTA).withSnapshot(snapshot).read();
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
index a3a87bef2..0ff55152a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
@@ -57,7 +57,8 @@ public class FileCreationTimeStartingScanner extends 
AbstractStartingScanner {
                 snapshotReader
                         .withMode(ScanMode.ALL)
                         .withSnapshot(startingSnapshotId)
-                        .withDataFileTimeMills(startupMillis)
+                        .withManifestEntryFilter(
+                                entry -> 
entry.file().creationTimeEpochMillis() >= startupMillis)
                         .read());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index c2439de55..96605ceeb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
@@ -62,7 +63,7 @@ public interface SnapshotReader {
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
 
-    SnapshotReader withDataFileTimeMills(long dataFileTimeMills);
+    SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter);
 
     SnapshotReader withBucket(int bucket);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 25c3ffa96..ef3523dfd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.DefaultValueAssigner;
@@ -207,8 +208,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
     }
 
     @Override
-    public SnapshotReader withDataFileTimeMills(long dataFileTimeMills) {
-        scan.withDataFileTimeMills(dataFileTimeMills);
+    public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> 
filter) {
+        scan.withManifestEntryFilter(filter);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index b78444625..50028d890 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -301,8 +301,8 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public SnapshotReader withDataFileTimeMills(long dataFileTimeMills) {
-            snapshotReader.withDataFileTimeMills(dataFileTimeMills);
+        public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> 
filter) {
+            snapshotReader.withManifestEntryFilter(filter);
             return this;
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
index 9f57e3507..ed513626a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java
@@ -47,18 +47,43 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
+import static java.util.Collections.singletonMap;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for append table compaction. */
 public class AppendOnlyTableCompactionTest {
 
+    private static final Random random = new Random();
+
     @TempDir private Path tempDir;
     private FileStoreTable appendOnlyFileStoreTable;
     private SnapshotManager snapshotManager;
     private UnawareAppendTableCompactionCoordinator compactionCoordinator;
     private AppendOnlyFileStoreWrite write;
+    private org.apache.paimon.fs.Path path;
+    private TableSchema tableSchema;
     private final String commitUser = UUID.randomUUID().toString();
 
+    @BeforeEach
+    public void createNegativeAppendOnlyTable() throws Exception {
+        FileIO fileIO = new LocalFileIO();
+        path = new org.apache.paimon.fs.Path(tempDir.toString());
+        tableSchema = new SchemaManager(fileIO, path).createTable(schema());
+        snapshotManager = new SnapshotManager(fileIO, path);
+        recreate();
+    }
+
+    private void recreate() {
+        appendOnlyFileStoreTable =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new org.apache.paimon.fs.Path(tempDir.toString()),
+                        tableSchema);
+        compactionCoordinator =
+                new 
UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable);
+        write = (AppendOnlyFileStoreWrite) 
appendOnlyFileStoreTable.store().newWrite(commitUser);
+    }
+
     @Test
     public void noCompaction() throws Exception {
         List<CommitMessage> messages = writeCommit(10);
@@ -100,6 +125,15 @@ public class AppendOnlyTableCompactionTest {
         assertThat(last.get(0).rowCount()).isEqualTo(11);
     }
 
+    @Test
+    public void testScanSkipBigFiles() throws Exception {
+        List<CommitMessage> messages = writeCommit(11);
+        commit(messages);
+        tableSchema = tableSchema.copy(singletonMap("target-file-size", "1 
b"));
+        recreate();
+        assertThat(compactionCoordinator.plan()).isEmpty();
+    }
+
     @Test
     public void testCompactionLot() throws Exception {
         // test continuous compaction
@@ -178,21 +212,4 @@ public class AppendOnlyTableCompactionTest {
                 BinaryString.fromString("B" + random.nextInt(100)),
                 BinaryString.fromString("C" + random.nextInt(100)));
     }
-
-    private static final Random random = new Random();
-
-    @BeforeEach
-    public void createNegativeAppendOnlyTable() throws Exception {
-        FileIO fileIO = new LocalFileIO();
-        org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(tempDir.toString());
-        SchemaManager schemaManager = new SchemaManager(fileIO, path);
-        TableSchema tableSchema = schemaManager.createTable(schema());
-        snapshotManager = new SnapshotManager(fileIO, path);
-        appendOnlyFileStoreTable =
-                FileStoreTableFactory.create(
-                        fileIO, new 
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
-        compactionCoordinator =
-                new 
UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable);
-        write = (AppendOnlyFileStoreWrite) 
appendOnlyFileStoreTable.store().newWrite(commitUser);
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
deleted file mode 100644
index c32e4fad0..000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.table.system.BucketsTable;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link ContinuousAppendAndCompactFollowUpScanner}. */
-public class ContinuousAppendAndCompactFollowUpScannerTest extends 
ScannerTestBase {
-
-    private static final RowType ROW_TYPE =
-            RowType.of(
-                    new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
-                    new String[] {"a", "b", "c"});
-
-    private final DataFileMetaSerializer dataFileMetaSerializer = new 
DataFileMetaSerializer();
-
-    @Test
-    public void testScan() throws Exception {
-        SnapshotManager snapshotManager = table.snapshotManager();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(2, 40, 400L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        List<CommitMessage> messageList = new ArrayList<>();
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        write.write(rowData(2, 40, 400L));
-        messageList.addAll(write.prepareCommit(true, 1));
-        write.write(rowData(1, 10, 100L));
-        write.write(rowData(1, 20, 200L));
-        messageList.addAll(write.prepareCommit(true, 1));
-        commit.commit(1, messageList);
-
-        write.compact(binaryRow(1), 0, true);
-        commit.commit(2, write.prepareCommit(true, 2));
-
-        write.close();
-        commit.close();
-
-        BucketsTable bucketsTable = new BucketsTable(table, true);
-        TableRead read = bucketsTable.newRead();
-        ContinuousAppendAndCompactFollowUpScanner scanner =
-                new ContinuousAppendAndCompactFollowUpScanner();
-
-        Snapshot snapshot = snapshotManager.snapshot(1);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        TableScan.Plan plan = scanner.scan(snapshot, snapshotReader);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
-
-        snapshot = snapshotManager.snapshot(2);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.scan(snapshot, snapshotReader);
-        assertThat(getResult(read, plan.splits()))
-                .hasSameElementsAs(Arrays.asList("+I 2|1|0|2", "+I 2|2|0|1"));
-
-        snapshot = snapshotManager.snapshot(3);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
-        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.scan(snapshot, snapshotReader);
-        assertThat(getResult(read, 
plan.splits())).hasSameElementsAs(Arrays.asList("+I 3|1|0|1"));
-    }
-
-    @Override
-    protected String rowDataToString(InternalRow rowData) {
-        int numFiles;
-        try {
-            numFiles = 
dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
-        return String.format(
-                "%s %d|%d|%d|%d",
-                rowData.getRowKind().shortString(),
-                rowData.getLong(0),
-                deserializeBinaryRow(rowData.getBinary(1)).getInt(0),
-                rowData.getInt(2),
-                numFiles);
-    }
-
-    @Override
-    protected FileStoreTable createFileStoreTable(Options conf) throws 
Exception {
-        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
-        TableSchema tableSchema =
-                schemaManager.createTable(
-                        new Schema(
-                                ROW_TYPE.getFields(),
-                                Collections.singletonList("a"),
-                                Collections.emptyList(),
-                                conf.toMap(),
-                                ""));
-        return FileStoreTableFactory.create(
-                fileIO, tablePath, tableSchema, conf, 
CatalogEnvironment.empty());
-    }
-}


Reply via email to