This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b5089cf5d [core] ensure that record-expire takes effect when full 
compaction (#5255)
2b5089cf5d is described below

commit 2b5089cf5df2e36e6dc18420e0162e408dc3e0f4
Author: LsomeYeah <[email protected]>
AuthorDate: Fri Mar 14 11:41:45 2025 +0800

    [core] ensure that record-expire takes effect when full compaction (#5255)
---
 .../org/apache/paimon/io/RecordLevelExpire.java    |  87 +++++++++-
 .../paimon/mergetree/compact/CompactStrategy.java  |  29 +++-
 .../mergetree/compact/MergeTreeCompactManager.java |  14 +-
 .../mergetree/compact/MergeTreeCompactTask.java    |  39 ++++-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   5 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   6 +-
 .../compact/MergeTreeCompactManagerTest.java       |   9 +-
 .../apache/paimon/table/RecordLevelExpireTest.java | 177 +++++++++++++++++++++
 8 files changed, 342 insertions(+), 24 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 05c70dcbe1..e8cae827ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -22,6 +22,12 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.KeyValueFieldsExtractor;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStatsEvolution;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
@@ -30,20 +36,33 @@ import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
 
 /** A factory to create {@link RecordReader} expires records by time. */
 public class RecordLevelExpire {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecordLevelExpire.class);
+
     private final int expireTime;
     private final Function<InternalRow, Optional<Integer>> fieldGetter;
 
+    private final ConcurrentMap<Long, TableSchema> tableSchemas;
+    private final TableSchema schema;
+    private final SchemaManager schemaManager;
+    private final SimpleStatsEvolutions fieldValueStatsConverters;
+
     @Nullable
-    public static RecordLevelExpire create(CoreOptions options, RowType 
rowType) {
+    public static RecordLevelExpire create(
+            CoreOptions options, TableSchema schema, SchemaManager 
schemaManager) {
         Duration expireTime = options.recordLevelExpireTime();
         if (expireTime == null) {
             return null;
@@ -56,6 +75,7 @@ public class RecordLevelExpire {
         }
 
         // should no project here, record level expire only works in compaction
+        RowType rowType = schema.logicalRowType();
         int fieldIndex = rowType.getFieldIndex(timeFieldName);
         if (fieldIndex == -1) {
             throw new IllegalArgumentException(
@@ -66,13 +86,69 @@ public class RecordLevelExpire {
         DataType dataType = rowType.getField(timeFieldName).type();
         Function<InternalRow, Optional<Integer>> fieldGetter =
                 createFieldGetter(dataType, fieldIndex);
-        return new RecordLevelExpire((int) expireTime.getSeconds(), 
fieldGetter);
+
+        LOG.info(
+                "Create RecordExpire. expireTime is {}s,timeField is {}",
+                (int) expireTime.getSeconds(),
+                timeFieldName);
+        return new RecordLevelExpire(
+                (int) expireTime.getSeconds(), fieldGetter, schema, 
schemaManager);
     }
 
     private RecordLevelExpire(
-            int expireTime, Function<InternalRow, Optional<Integer>> 
fieldGetter) {
+            int expireTime,
+            Function<InternalRow, Optional<Integer>> fieldGetter,
+            TableSchema schema,
+            SchemaManager schemaManager) {
         this.expireTime = expireTime;
         this.fieldGetter = fieldGetter;
+
+        this.tableSchemas = new ConcurrentHashMap<>();
+        this.schema = schema;
+        this.schemaManager = schemaManager;
+
+        KeyValueFieldsExtractor extractor =
+                PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
+
+        fieldValueStatsConverters =
+                new SimpleStatsEvolutions(
+                        sid -> extractor.valueFields(scanTableSchema(sid)), 
schema.id());
+    }
+
+    public boolean isExpireFile(DataFileMeta file) {
+        InternalRow minValues = file.valueStats().minValues();
+
+        if (file.schemaId() != schema.id() || file.valueStatsCols() != null) {
+            // In the following cases, can not read minValues with field index 
directly
+            //
+            // 1. if the table had suffered schema evolution, read minValues 
with new field index
+            // may cause exception.
+            // 2. if metadata.stats-dense-store = true, minValues may not 
contain all data fields
+            // which may cause exception when reading with origin field index
+            SimpleStatsEvolution.Result result =
+                    fieldValueStatsConverters
+                            .getOrCreate(file.schemaId())
+                            .evolution(file.valueStats(), file.rowCount(), 
file.valueStatsCols());
+            minValues = result.minValues();
+        }
+
+        int currentTime = (int) (System.currentTimeMillis() / 1000);
+        Optional<Integer> minTime = fieldGetter.apply(minValues);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "expire time is {}, currentTime is {}, file min time for 
time field is {}. "
+                            + "file name is {}, file level is {}, file schema 
id is {}, file valueStatsCols is {}",
+                    expireTime,
+                    currentTime,
+                    minTime.isPresent() ? minTime.get() : "empty",
+                    file.fileName(),
+                    file.level(),
+                    file.schemaId(),
+                    file.valueStatsCols());
+        }
+
+        return minTime.map(minValue -> currentTime - expireTime > 
minValue).orElse(false);
     }
 
     public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> 
readerFactory) {
@@ -130,4 +206,9 @@ public class RecordLevelExpire {
 
         return fieldGetter;
     }
+
+    private TableSchema scanTableSchema(long id) {
+        return tableSchemas.computeIfAbsent(
+                id, key -> key == schema.id() ? schema : 
schemaManager.schema(id));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index 90471d1caf..23c09d3dc3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -19,8 +19,13 @@
 package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.mergetree.LevelSortedRun;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
@@ -39,11 +44,29 @@ public interface CompactStrategy {
     Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
 
     /** Pick a compaction unit consisting of all existing files. */
-    static Optional<CompactUnit> pickFullCompaction(int numLevels, 
List<LevelSortedRun> runs) {
+    static Optional<CompactUnit> pickFullCompaction(
+            int numLevels,
+            List<LevelSortedRun> runs,
+            @Nullable RecordLevelExpire recordLevelExpire) {
         int maxLevel = numLevels - 1;
-        if (runs.isEmpty() || (runs.size() == 1 && runs.get(0).level() == 
maxLevel)) {
-            // no sorted run or only 1 sorted run on the max level, no need to 
compact
+        if (runs.isEmpty()) {
+            // no sorted run, no need to compact
             return Optional.empty();
+        } else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
+            if (recordLevelExpire == null) {
+                // only 1 sorted run on the max level and don't check 
record-expire, no need to
+                // compact
+                return Optional.empty();
+            }
+
+            // pick the files which has expired records
+            List<DataFileMeta> filesContainExpireRecords = new ArrayList<>();
+            for (DataFileMeta file : runs.get(0).run().files()) {
+                if (recordLevelExpire.isExpireFile(file)) {
+                    filesContainExpireRecords.add(file);
+                }
+            }
+            return Optional.of(CompactUnit.fromFiles(maxLevel, 
filesContainExpireRecords));
         } else {
             return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f8d9a44e4e..a1b1954d5b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -27,6 +27,7 @@ import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -65,6 +66,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     private final boolean lazyGenDeletionFile;
     private final boolean needLookup;
 
+    @Nullable private final RecordLevelExpire recordLevelExpire;
+
     public MergeTreeCompactManager(
             ExecutorService executor,
             Levels levels,
@@ -76,7 +79,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             @Nullable CompactionMetrics.Reporter metricsReporter,
             @Nullable DeletionVectorsMaintainer dvMaintainer,
             boolean lazyGenDeletionFile,
-            boolean needLookup) {
+            boolean needLookup,
+            @Nullable RecordLevelExpire recordLevelExpire) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -87,6 +91,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.metricsReporter = metricsReporter;
         this.dvMaintainer = dvMaintainer;
         this.lazyGenDeletionFile = lazyGenDeletionFile;
+        this.recordLevelExpire = recordLevelExpire;
         this.needLookup = needLookup;
 
         MetricUtils.safeCall(this::reportMetrics, LOG);
@@ -128,7 +133,9 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                         "Trigger forced full compaction. Picking from the 
following runs\n{}",
                         runs);
             }
-            optionalUnit = 
CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs);
+            optionalUnit =
+                    CompactStrategy.pickFullCompaction(
+                            levels.numberOfLevels(), runs, recordLevelExpire);
         } else {
             if (taskFuture != null) {
                 return;
@@ -201,7 +208,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                         dropDelete,
                         levels.maxLevel(),
                         metricsReporter,
-                        compactDfSupplier);
+                        compactDfSupplier,
+                        recordLevelExpire);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Pick these files (name, level, size) for compaction: {}",
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index a1b64072d8..890302f4be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -24,6 +24,7 @@ import org.apache.paimon.compact.CompactTask;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
 
@@ -52,6 +53,8 @@ public class MergeTreeCompactTask extends CompactTask {
     // metric
     private int upgradeFilesNum;
 
+    @Nullable private final RecordLevelExpire recordLevelExpire;
+
     public MergeTreeCompactTask(
             Comparator<InternalRow> keyComparator,
             long minFileSize,
@@ -60,7 +63,8 @@ public class MergeTreeCompactTask extends CompactTask {
             boolean dropDelete,
             int maxLevel,
             @Nullable CompactionMetrics.Reporter metricsReporter,
-            Supplier<CompactDeletionFile> compactDfSupplier) {
+            Supplier<CompactDeletionFile> compactDfSupplier,
+            @Nullable RecordLevelExpire recordLevelExpire) {
         super(metricsReporter);
         this.minFileSize = minFileSize;
         this.rewriter = rewriter;
@@ -69,6 +73,7 @@ public class MergeTreeCompactTask extends CompactTask {
         this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
         this.dropDelete = dropDelete;
         this.maxLevel = maxLevel;
+        this.recordLevelExpire = recordLevelExpire;
 
         this.upgradeFilesNum = 0;
     }
@@ -117,19 +122,26 @@ public class MergeTreeCompactTask extends CompactTask {
 
     private void upgrade(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
         if (file.level() == outputLevel) {
+            if (isContainExpiredRecords(file)) {
+                // if the large file in maxLevel has expired records, we need 
to rewrite it
+                rewriteFile(file, toUpdate);
+            }
             return;
         }
 
         if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 
0).orElse(false)) {
-            CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
-            toUpdate.merge(upgradeResult);
-            upgradeFilesNum++;
+            if (isContainExpiredRecords(file)) {
+                // if the file which could be directly upgraded has expired 
records, we need to
+                // rewrite it
+                rewriteFile(file, toUpdate);
+            } else {
+                CompactResult upgradeResult = rewriter.upgrade(outputLevel, 
file);
+                toUpdate.merge(upgradeResult);
+                upgradeFilesNum++;
+            }
         } else {
             // files with delete records should not be upgraded directly to 
max level
-            List<List<SortedRun>> candidate = new ArrayList<>();
-            candidate.add(new ArrayList<>());
-            candidate.get(0).add(SortedRun.fromSingle(file));
-            rewriteImpl(candidate, toUpdate);
+            rewriteFile(file, toUpdate);
         }
     }
 
@@ -158,4 +170,15 @@ public class MergeTreeCompactTask extends CompactTask {
         toUpdate.merge(rewriteResult);
         candidate.clear();
     }
+
+    private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws 
Exception {
+        List<List<SortedRun>> candidate = new ArrayList<>();
+        candidate.add(new ArrayList<>());
+        candidate.get(0).add(SortedRun.fromSingle(file));
+        rewriteImpl(candidate, toUpdate);
+    }
+
+    private boolean isContainExpiredRecords(DataFileMeta file) {
+        return recordLevelExpire != null && 
recordLevelExpire.isExpireFile(file);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 97865e763f..c439112064 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -158,7 +158,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         pathFactory,
                         extractor,
                         options);
-        this.recordLevelExpire = RecordLevelExpire.create(options, valueType);
+        this.recordLevelExpire = RecordLevelExpire.create(options, schema, 
schemaManager);
         this.writerFactoryBuilder =
                 KeyValueFileWriterFactory.builder(
                         fileIO,
@@ -281,7 +281,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             : compactionMetrics.createReporter(partition, 
bucket),
                     dvMaintainer,
                     options.prepareCommitWaitCompaction(),
-                    options.needLookup());
+                    options.needLookup(),
+                    recordLevelExpire);
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 01a9326aed..1207daa86c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -541,7 +541,8 @@ public abstract class MergeTreeTestBase {
                 null,
                 null,
                 false,
-                options.needLookup());
+                options.needLookup(),
+                null);
     }
 
     static class MockFailResultCompactionManager extends 
MergeTreeCompactManager {
@@ -564,7 +565,8 @@ public abstract class MergeTreeTestBase {
                     null,
                     null,
                     false,
-                    false);
+                    false,
+                    null);
         }
 
         protected CompactResult obtainCompactResult()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index e7fa767b4b..4240555977 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -207,7 +207,8 @@ public class MergeTreeCompactManagerTest {
                         null,
                         null,
                         false,
-                        true);
+                        true,
+                        null);
 
         MergeTreeCompactManager defaultManager =
                 new MergeTreeCompactManager(
@@ -221,7 +222,8 @@ public class MergeTreeCompactManagerTest {
                         null,
                         null,
                         false,
-                        false);
+                        false,
+                        null);
 
         assertThat(lookupManager.compactNotCompleted()).isTrue();
         assertThat(defaultManager.compactNotCompleted()).isFalse();
@@ -256,7 +258,8 @@ public class MergeTreeCompactManagerTest {
                         null,
                         null,
                         false,
-                        false);
+                        false,
+                        null);
         manager.triggerCompaction(false);
         manager.getCompactionResult(true);
         List<LevelMinMax> outputs =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index bce57c2752..cb5c517973 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -26,8 +26,12 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -35,6 +39,10 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -114,4 +122,173 @@ class RecordLevelExpireTest extends 
PrimaryKeyTableTestBase {
                         GenericRow.of(1, 4, currentSecs + 60 * 60),
                         GenericRow.of(1, 5, currentSecs + 60 * 60));
     }
+
+    @Test
+    public void testIsExpireFile() throws Exception {
+        CoreOptions coreOptions = table.coreOptions();
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+        table = table.copy(dynamicOptions);
+
+        // common case
+        int currentSecs = (int) (System.currentTimeMillis() / 1000);
+        writeCommit(
+                GenericRow.of(1, 1, currentSecs + 60 * 60),
+                GenericRow.of(1, 2, currentSecs + 30 * 60),
+                GenericRow.of(1, 3, currentSecs - 60 * 60),
+                GenericRow.of(1, 4, currentSecs - 30 * 60));
+
+        writeCommit(
+                GenericRow.of(1, 5, currentSecs + 60 * 60),
+                GenericRow.of(1, 6, currentSecs + 30 * 60),
+                GenericRow.of(1, 7, currentSecs + 20 * 60),
+                GenericRow.of(1, 8, currentSecs + 10 * 60));
+
+        RecordLevelExpire recordLevelExpire =
+                RecordLevelExpire.create(coreOptions, table.schema(), 
table.schemaManager());
+        List<DataSplit> splits1 = 
table.newSnapshotReader().read().dataSplits();
+        assertThat(splits1.size()).isEqualTo(1);
+        List<DataFileMeta> files1 = splits1.get(0).dataFiles();
+        assertThat(files1.size()).isEqualTo(2);
+        assertThat(recordLevelExpire.isExpireFile(files1.get(0))).isTrue();
+        assertThat(recordLevelExpire.isExpireFile(files1.get(1))).isFalse();
+
+        // schema evolution
+        table.schemaManager()
+                .commitChanges(
+                        Collections.singletonList(
+                                SchemaChange.addColumn(
+                                        "col0",
+                                        DataTypes.INT(),
+                                        null,
+                                        SchemaChange.Move.after("col0", 
"pk"))));
+        refreshTable();
+
+        recordLevelExpire =
+                RecordLevelExpire.create(coreOptions, table.schema(), 
table.schemaManager());
+        List<DataSplit> splits2 = 
table.newSnapshotReader().read().dataSplits();
+        List<DataFileMeta> files2 = splits2.get(0).dataFiles();
+        assertThat(recordLevelExpire.isExpireFile(files2.get(0))).isTrue();
+        assertThat(recordLevelExpire.isExpireFile(files2.get(1))).isFalse();
+
+        // metadata.stats-dense-store = true
+        dynamicOptions.put(CoreOptions.METADATA_STATS_DENSE_STORE.key(), 
"true");
+        dynamicOptions.put(CoreOptions.METADATA_STATS_MODE.key(), "none");
+        dynamicOptions.put("fields.col1.stats-mode", "full");
+        table = table.copy(dynamicOptions);
+
+        writeCommit(
+                GenericRow.of(1, 9, 9, currentSecs + 60 * 60),
+                GenericRow.of(1, 10, 10, currentSecs + 30 * 60));
+        writeCommit(
+                GenericRow.of(1, 11, 11, currentSecs + 60 * 60),
+                GenericRow.of(1, 12, 12, currentSecs - 30 * 60));
+
+        recordLevelExpire =
+                RecordLevelExpire.create(coreOptions, table.schema(), 
table.schemaManager());
+        List<DataSplit> splits3 = 
table.newSnapshotReader().read().dataSplits();
+        List<DataFileMeta> files3 = splits3.get(0).dataFiles();
+        assertThat(recordLevelExpire.isExpireFile(files3.get(0))).isTrue();
+        assertThat(recordLevelExpire.isExpireFile(files3.get(1))).isFalse();
+        assertThat(recordLevelExpire.isExpireFile(files3.get(2))).isFalse();
+        assertThat(recordLevelExpire.isExpireFile(files3.get(3))).isTrue();
+
+        // schema evolution again, change the valueCols
+        table.schemaManager()
+                .commitChanges(
+                        Collections.singletonList(
+                                SchemaChange.addColumn(
+                                        "col2",
+                                        DataTypes.INT(),
+                                        null,
+                                        SchemaChange.Move.after("col2", 
"pk"))));
+        refreshTable();
+
+        // new files has no stats for record-level.time-field
+        dynamicOptions.put("fields.col1.stats-mode", "none");
+        dynamicOptions.put("fields.col2.stats-mode", "full");
+        table = table.copy(dynamicOptions);
+
+        writeCommit(
+                GenericRow.of(1, 13, 13, 13, currentSecs + 60 * 60),
+                GenericRow.of(1, 14, 14, 14, currentSecs + 30 * 60));
+        writeCommit(
+                GenericRow.of(1, 15, 15, 15, currentSecs + 60 * 60),
+                GenericRow.of(1, 16, 16, 16, currentSecs - 30 * 60));
+
+        recordLevelExpire =
+                RecordLevelExpire.create(coreOptions, table.schema(), 
table.schemaManager());
+        List<DataSplit> splits4 = 
table.newSnapshotReader().read().dataSplits();
+        List<DataFileMeta> files4 = splits4.get(0).dataFiles();
+        // old files with record-level.time-field stats
+        assertThat(recordLevelExpire.isExpireFile(files4.get(0))).isTrue();
+        assertThat(recordLevelExpire.isExpireFile(files4.get(1))).isFalse();
+        assertThat(recordLevelExpire.isExpireFile(files4.get(2))).isFalse();
+        assertThat(recordLevelExpire.isExpireFile(files4.get(3))).isTrue();
+        // new files without record-level.time-field stats, cannot expire 
records
+        assertThat(recordLevelExpire.isExpireFile(files4.get(4))).isFalse();
+        assertThat(recordLevelExpire.isExpireFile(files4.get(5))).isFalse();
+    }
+
+    @Test
+    public void testTotallyExpire() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1500 B");
+        table = table.copy(map);
+
+        int currentSecs = (int) (System.currentTimeMillis() / 1000);
+        // if seconds is too short, this test might file
+        int seconds = 5;
+
+        // large file A. It has no delete records and expired records, will be 
upgraded to maxLevel
+        // without rewriting when full compaction
+        writeCommit(
+                GenericRow.of(1, 1, currentSecs + 60 * 60),
+                GenericRow.of(1, 2, currentSecs + seconds));
+        compact(1);
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, currentSecs + 60 * 60),
+                        GenericRow.of(1, 2, currentSecs + seconds));
+
+        // large file B. It has no delete records but has expired records
+        writeCommit(
+                GenericRow.of(1, 3, currentSecs + 60 * 60),
+                GenericRow.of(1, 4, currentSecs - 60 * 60));
+        // no full compaction, expired records can be queried
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, currentSecs + 60 * 60),
+                        GenericRow.of(1, 2, currentSecs + seconds),
+                        GenericRow.of(1, 3, currentSecs + 60 * 60),
+                        GenericRow.of(1, 4, currentSecs - 60 * 60));
+        compact(1);
+        List<DataSplit> splits1 = 
table.newSnapshotReader().read().dataSplits();
+        assertThat(splits1.size()).isEqualTo(1);
+        assertThat(splits1.get(0).dataFiles().size()).isEqualTo(2);
+        // full compaction, expired records will be removed
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, currentSecs + 60 * 60),
+                        GenericRow.of(1, 2, currentSecs + seconds),
+                        GenericRow.of(1, 3, currentSecs + 60 * 60));
+
+        // ensure (1, 2, currentSecs + seconds) out of date
+        Thread.sleep(seconds * 1000 + 2000);
+        compact(1);
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, currentSecs + 60 * 60),
+                        GenericRow.of(1, 3, currentSecs + 60 * 60));
+    }
+
+    private void refreshTable() throws Catalog.TableNotExistException {
+        CatalogContext context =
+                CatalogContext.create(
+                        new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString()));
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        Identifier identifier = new Identifier("default", "T");
+
+        table = (FileStoreTable) catalog.getTable(identifier);
+    }
 }

Reply via email to