This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fcb7c1ddd [core] Introduce record-level expire time (#3293)
fcb7c1ddd is described below

commit fcb7c1ddd0294efb72e3b4dd02922d0d0429b7e4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 13:42:51 2024 +0800

    [core] Introduce record-level expire time (#3293)
---
 docs/content/primary-key-table/overview.md         |  8 ++
 .../shortcodes/generated/core_configuration.html   | 12 +++
 .../main/java/org/apache/paimon/CoreOptions.java   | 27 +++++++
 .../org/apache/paimon/io/FileReaderFactory.java    | 29 +++++++
 .../paimon/io/KeyValueFileReaderFactory.java       |  7 +-
 .../org/apache/paimon/io/RecordLevelExpire.java    | 92 ++++++++++++++++++++++
 .../apache/paimon/mergetree/MergeTreeReaders.java  | 16 ++--
 .../compact/ChangelogMergeTreeRewriter.java        |  4 +-
 .../FullChangelogMergeTreeCompactRewriter.java     |  4 +-
 .../compact/LookupMergeTreeCompactRewriter.java    |  4 +-
 .../compact/MergeTreeCompactRewriter.java          |  6 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   | 17 ++--
 .../paimon/catalog/PrimaryKeyTableTestBase.java    | 47 +++++++++++
 .../apache/paimon/table/RecordLevelExpireTest.java | 70 ++++++++++++++++
 14 files changed, 316 insertions(+), 27 deletions(-)

diff --git a/docs/content/primary-key-table/overview.md 
b/docs/content/primary-key-table/overview.md
index f622ff093..13d1098b2 100644
--- a/docs/content/primary-key-table/overview.md
+++ b/docs/content/primary-key-table/overview.md
@@ -69,3 +69,11 @@ To limit the number of sorted runs, we have to merge several 
sorted runs into on
 However, compaction is a resource intensive procedure which consumes a certain 
amount of CPU time and disk IO, so too frequent compaction may in turn result 
in slower writes. It is a trade-off between query and write performance. Paimon 
currently adapts a compaction strategy similar to Rocksdb's [universal 
compaction](https://github.com/facebook/rocksdb/wiki/Universal-Compaction).
 
 By default, when Paimon appends records to the LSM tree, it will also perform 
compactions as needed. Users can also choose to perform all compactions in a 
dedicated compaction job. See [dedicated compaction job]({{< ref 
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
+
+### Record-Level expire
+
+In compaction, you can configure record-Level expire time to expire records, 
you should configure:
+1. `'record-level.expire-time'`: time retain for records.
+2. `'record-level.time-field'`: time field for record level expire, it should 
be a seconds INT.
+
+Expiration happens in compaction, and there is no strong guarantee to expire 
records in time.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 713e55e10..892fce56d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -473,6 +473,18 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Integer</td>
             <td>Read batch size for orc and parquet.</td>
         </tr>
+        <tr>
+            <td><h5>record-level.expire-time</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Record level expire time for primary key table, expiration 
happens in compaction, there is no strong guarantee to expire records in time. 
You must specific 'record-level.time-field' too.</td>
+        </tr>
+        <tr>
+            <td><h5>record-level.time-field</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Time field for record level expire, it should be a seconds 
INT.</td>
+        </tr>
         <tr>
             <td><h5>rowkind.field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b75d045e8..63832638b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1127,6 +1127,23 @@ public class CoreOptions implements Serializable {
                     .defaultValue(1000)
                     .withDescription(
                             "The magnification of local sample for 
sort-compaction.The size of local sample is sink parallelism * magnification.");
+
+    public static final ConfigOption<Duration> RECORD_LEVEL_EXPIRE_TIME =
+            key("record-level.expire-time")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Record level expire time for primary key table, 
expiration happens in compaction, "
+                                    + "there is no strong guarantee to expire 
records in time. "
+                                    + "You must specific 
'record-level.time-field' too.");
+
+    public static final ConfigOption<String> RECORD_LEVEL_TIME_FIELD =
+            key("record-level.time-field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Time field for record level expire, it should be 
a seconds INT.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1761,6 +1778,16 @@ public class CoreOptions implements Serializable {
         return options.get(DELETION_FORCE_PRODUCE_CHANGELOG);
     }
 
+    @Nullable
+    public Duration recordLevelExpireTime() {
+        return options.get(RECORD_LEVEL_EXPIRE_TIME);
+    }
+
+    @Nullable
+    public String recordLevelTimeField() {
+        return options.get(RECORD_LEVEL_TIME_FIELD);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileReaderFactory.java
new file mode 100644
index 000000000..6fc2b85ba
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileReaderFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.reader.RecordReader;
+
+import java.io.IOException;
+
+/** Factory to read records from file. */
+public interface FileReaderFactory<T> {
+
+    RecordReader<T> createRecordReader(DataFileMeta file) throws IOException;
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index e7d091b47..2a19cbdca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -53,7 +53,7 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 /** Factory to create {@link RecordReader}s for reading {@link KeyValue} 
files. */
-public class KeyValueFileReaderFactory {
+public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
 
     private final FileIO fileIO;
     private final SchemaManager schemaManager;
@@ -95,6 +95,11 @@ public class KeyValueFileReaderFactory {
         this.ignoreDelete = 
CoreOptions.fromMap(schema.options()).ignoreDelete();
     }
 
+    @Override
+    public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws 
IOException {
+        return createRecordReader(file.schemaId(), file.fileName(), 
file.fileSize(), file.level());
+    }
+
     public RecordReader<KeyValue> createRecordReader(
             long schemaId, String fileName, long fileSize, int level) throws 
IOException {
         if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
new file mode 100644
index 000000000..a49e31fc9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A factory to create {@link RecordReader} expires records by time. */
+public class RecordLevelExpire {
+
+    private final int timeField;
+    private final int expireTime;
+
+    @Nullable
+    public static RecordLevelExpire create(CoreOptions options, RowType 
rowType) {
+        Duration expireTime = options.recordLevelExpireTime();
+        if (expireTime == null) {
+            return null;
+        }
+
+        String timeField = options.recordLevelTimeField();
+        if (timeField == null) {
+            throw new IllegalArgumentException(
+                    "You should set time field for record-level expire.");
+        }
+
+        // should no project here, record level expire only works in compaction
+        int fieldIndex = rowType.getFieldIndex(timeField);
+        if (fieldIndex == -1) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Can not find time field %s for record level 
expire.", timeField));
+        }
+
+        DataField field = rowType.getField(timeField);
+        if (!(field.type() instanceof IntType)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Record level time field should be INT type, but 
is %s.",
+                            field.type()));
+        }
+
+        return new RecordLevelExpire(fieldIndex, (int) 
expireTime.getSeconds());
+    }
+
+    public RecordLevelExpire(int timeField, int expireTime) {
+        this.timeField = timeField;
+        this.expireTime = expireTime;
+    }
+
+    public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> 
readerFactory) {
+        return file -> wrap(readerFactory.createRecordReader(file));
+    }
+
+    public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
+        int currentTime = (int) (System.currentTimeMillis() / 1000);
+        return reader.filter(
+                kv -> {
+                    checkArgument(
+                            !kv.value().isNullAt(timeField),
+                            "Time field for record-level expire should not be 
null.");
+                    int recordTime = kv.value().getInt(timeField);
+                    return currentTime <= recordTime + expireTime;
+                });
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 866e2d33c..766dccca2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -21,7 +21,7 @@ package org.apache.paimon.mergetree;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
@@ -42,7 +42,7 @@ public class MergeTreeReaders {
 
     public static <T> RecordReader<T> readerForMergeTree(
             List<List<SortedRun>> sections,
-            KeyValueFileReaderFactory readerFactory,
+            FileReaderFactory<KeyValue> readerFactory,
             Comparator<InternalRow> userKeyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionWrapper<T> mergeFunctionWrapper,
@@ -65,7 +65,7 @@ public class MergeTreeReaders {
 
     public static <T> RecordReader<T> readerForSection(
             List<SortedRun> section,
-            KeyValueFileReaderFactory readerFactory,
+            FileReaderFactory<KeyValue> readerFactory,
             Comparator<InternalRow> userKeyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionWrapper<T> mergeFunctionWrapper,
@@ -80,16 +80,10 @@ public class MergeTreeReaders {
     }
 
     private static RecordReader<KeyValue> readerForRun(
-            SortedRun run, KeyValueFileReaderFactory readerFactory) throws 
IOException {
+            SortedRun run, FileReaderFactory<KeyValue> readerFactory) throws 
IOException {
         List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (DataFileMeta file : run.files()) {
-            readers.add(
-                    () ->
-                            readerFactory.createRecordReader(
-                                    file.schemaId(),
-                                    file.fileName(),
-                                    file.fileSize(),
-                                    file.level()));
+            readers.add(() -> readerFactory.createRecordReader(file));
         }
         return ConcatRecordReader.create(readers);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 98553030d..a1aabc4e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.mergetree.MergeSorter;
@@ -51,7 +51,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
     public ChangelogMergeTreeRewriter(
             int maxLevel,
             MergeEngine mergeEngine,
-            KeyValueFileReaderFactory readerFactory,
+            FileReaderFactory<KeyValue> readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 7283cbb7b..56a13dfc7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
@@ -48,7 +48,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
     public FullChangelogMergeTreeCompactRewriter(
             int maxLevel,
             CoreOptions.MergeEngine mergeEngine,
-            KeyValueFileReaderFactory readerFactory,
+            FileReaderFactory<KeyValue> readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index fea64539b..ca7069455 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.LookupLevels;
@@ -58,7 +58,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             int maxLevel,
             MergeEngine mergeEngine,
             LookupLevels<T> lookupLevels,
-            KeyValueFileReaderFactory readerFactory,
+            FileReaderFactory<KeyValue> readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 4cfe00f3f..d399d599d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -22,7 +22,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.mergetree.DropDeleteReader;
@@ -42,7 +42,7 @@ import java.util.List;
 /** Default {@link CompactRewriter} for merge trees. */
 public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
 
-    protected final KeyValueFileReaderFactory readerFactory;
+    protected final FileReaderFactory<KeyValue> readerFactory;
     protected final KeyValueFileWriterFactory writerFactory;
     protected final Comparator<InternalRow> keyComparator;
     @Nullable protected final FieldsComparator userDefinedSeqComparator;
@@ -50,7 +50,7 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     protected final MergeSorter mergeSorter;
 
     public MergeTreeCompactRewriter(
-            KeyValueFileReaderFactory readerFactory,
+            FileReaderFactory<KeyValue> readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 8343ba8b8..314019a60 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -35,8 +35,10 @@ import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.mergetree.Levels;
@@ -100,6 +102,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final FileIO fileIO;
     private final RowType keyType;
     private final RowType valueType;
+    @Nullable private final RecordLevelExpire recordLevelExpire;
 
     public KeyValueFileStoreWrite(
             FileIO fileIO,
@@ -144,6 +147,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         pathFactory,
                         extractor,
                         options);
+        this.recordLevelExpire = RecordLevelExpire.create(options, valueType);
         this.writerFactoryBuilder =
                 KeyValueFileWriterFactory.builder(
                         fileIO,
@@ -258,8 +262,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             Levels levels,
             @Nullable DeletionVectorsMaintainer dvMaintainer) {
         DeletionVector.Factory dvFactory = 
DeletionVector.factory(dvMaintainer);
-        KeyValueFileReaderFactory readerFactory =
+        FileReaderFactory<KeyValue> readerFactory =
                 readerFactoryBuilder.build(partition, bucket, dvFactory);
+        if (recordLevelExpire != null) {
+            readerFactory = recordLevelExpire.wrap(readerFactory);
+        }
         KeyValueFileWriterFactory writerFactory =
                 writerFactoryBuilder.build(partition, bucket, options);
         MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, 
ioManager);
@@ -282,7 +289,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             LookupStrategy lookupStrategy = options.lookupStrategy();
             LookupLevels.ValueProcessor<?> processor;
             LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> 
wrapperFactory;
-            KeyValueFileReaderFactory lookupReaderFactory = readerFactory;
+            FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
             if (mergeEngine == FIRST_ROW) {
                 if (options.deletionVectorsEnabled()) {
                     throw new UnsupportedOperationException(
@@ -337,7 +344,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private <T> LookupLevels<T> createLookupLevels(
             Levels levels,
             LookupLevels.ValueProcessor<T> valueProcessor,
-            KeyValueFileReaderFactory readerFactory) {
+            FileReaderFactory<KeyValue> readerFactory) {
         if (ioManager == null) {
             throw new RuntimeException(
                     "Can not use lookup, there is no temp disk directory to 
use.");
@@ -348,9 +355,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 keyComparatorSupplier.get(),
                 keyType,
                 valueProcessor,
-                file ->
-                        readerFactory.createRecordReader(
-                                file.schemaId(), file.fileName(), 
file.fileSize(), file.level()),
+                readerFactory::createRecordReader,
                 () -> ioManager.createChannel().getPathFile(),
                 new HashLookupStoreFactory(
                         cacheManager,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
index 4c334e5bf..54152c896 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
@@ -18,11 +18,17 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -32,6 +38,8 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.function.Predicate;
 
@@ -82,4 +90,43 @@ public abstract class PrimaryKeyTableTestBase {
     protected static long utcMills(String timestamp) {
         return 
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
     }
+
+    protected void writeCommit(InternalRow... rows) throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        for (InternalRow row : rows) {
+            write.write(row);
+        }
+        writeBuilder.newCommit().commit(write.prepareCommit());
+        write.close();
+    }
+
+    protected void compact(int partition) throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.compact(BinaryRow.singleColumn(partition), 0, true);
+        writeBuilder.newCommit().commit(write.prepareCommit());
+        write.close();
+    }
+
+    protected List<GenericRow> query() throws Exception {
+        return query(new int[] {0, 1, 2});
+    }
+
+    protected List<GenericRow> query(int[] projection) throws Exception {
+        ReadBuilder readBuilder = 
table.newReadBuilder().withProjection(projection);
+        List<GenericRow> rows = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(readBuilder.newScan().plan())
+                .forEachRemaining(
+                        r -> {
+                            GenericRow newR = new 
GenericRow(projection.length);
+                            for (int i = 0; i < projection.length; i++) {
+                                newR.setField(i, r.getInt(i));
+                            }
+                            rows.add(newR);
+                        });
+        return rows;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
new file mode 100644
index 000000000..367a30b66
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
+
+    @Override
+    protected Options tableOptions() {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, 
Duration.ofSeconds(1));
+        options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+        return options;
+    }
+
+    @Test
+    public void test() throws Exception {
+        writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));
+
+        // can be queried
+        assertThat(query())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 1, 1), 
GenericRow.of(1, 2, 2));
+
+        int currentSecs = (int) (System.currentTimeMillis() / 1000);
+        writeCommit(GenericRow.of(1, 3, currentSecs));
+        writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60));
+        Thread.sleep(2000);
+
+        // no compaction, can be queried
+        assertThat(query())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 1),
+                        GenericRow.of(1, 2, 2),
+                        GenericRow.of(1, 3, currentSecs),
+                        GenericRow.of(1, 4, currentSecs + 60 * 60));
+
+        // compact, expired
+        compact(1);
+        assertThat(query()).containsExactlyInAnyOrder(GenericRow.of(1, 4, 
currentSecs + 60 * 60));
+        assertThat(query(new int[] {2}))
+                .containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 * 
60));
+    }
+}

Reply via email to