This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fcb7c1ddd [core] Introduce record-level expire time (#3293)
fcb7c1ddd is described below
commit fcb7c1ddd0294efb72e3b4dd02922d0d0429b7e4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 13:42:51 2024 +0800
[core] Introduce record-level expire time (#3293)
---
docs/content/primary-key-table/overview.md | 8 ++
.../shortcodes/generated/core_configuration.html | 12 +++
.../main/java/org/apache/paimon/CoreOptions.java | 27 +++++++
.../org/apache/paimon/io/FileReaderFactory.java | 29 +++++++
.../paimon/io/KeyValueFileReaderFactory.java | 7 +-
.../org/apache/paimon/io/RecordLevelExpire.java | 92 ++++++++++++++++++++++
.../apache/paimon/mergetree/MergeTreeReaders.java | 16 ++--
.../compact/ChangelogMergeTreeRewriter.java | 4 +-
.../FullChangelogMergeTreeCompactRewriter.java | 4 +-
.../compact/LookupMergeTreeCompactRewriter.java | 4 +-
.../compact/MergeTreeCompactRewriter.java | 6 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 17 ++--
.../paimon/catalog/PrimaryKeyTableTestBase.java | 47 +++++++++++
.../apache/paimon/table/RecordLevelExpireTest.java | 70 ++++++++++++++++
14 files changed, 316 insertions(+), 27 deletions(-)
diff --git a/docs/content/primary-key-table/overview.md
b/docs/content/primary-key-table/overview.md
index f622ff093..13d1098b2 100644
--- a/docs/content/primary-key-table/overview.md
+++ b/docs/content/primary-key-table/overview.md
@@ -69,3 +69,11 @@ To limit the number of sorted runs, we have to merge several
sorted runs into on
However, compaction is a resource intensive procedure which consumes a certain
amount of CPU time and disk IO, so too frequent compaction may in turn result
in slower writes. It is a trade-off between query and write performance. Paimon
currently adapts a compaction strategy similar to Rocksdb's [universal
compaction](https://github.com/facebook/rocksdb/wiki/Universal-Compaction).
By default, when Paimon appends records to the LSM tree, it will also perform
compactions as needed. Users can also choose to perform all compactions in a
dedicated compaction job. See [dedicated compaction job]({{< ref
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
+
+### Record-Level expire
+
+In compaction, you can configure record-Level expire time to expire records,
you should configure:
+1. `'record-level.expire-time'`: time retain for records.
+2. `'record-level.time-field'`: time field for record level expire, it should
be a seconds INT.
+
+Expiration happens in compaction, and there is no strong guarantee to expire
records in time.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 713e55e10..892fce56d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -473,6 +473,18 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Integer</td>
<td>Read batch size for orc and parquet.</td>
</tr>
+ <tr>
+ <td><h5>record-level.expire-time</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Record level expire time for primary key table, expiration
happens in compaction, there is no strong guarantee to expire records in time.
You must specific 'record-level.time-field' too.</td>
+ </tr>
+ <tr>
+ <td><h5>record-level.time-field</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Time field for record level expire, it should be a seconds
INT.</td>
+ </tr>
<tr>
<td><h5>rowkind.field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b75d045e8..63832638b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1127,6 +1127,23 @@ public class CoreOptions implements Serializable {
.defaultValue(1000)
.withDescription(
"The magnification of local sample for
sort-compaction.The size of local sample is sink parallelism * magnification.");
+
+ public static final ConfigOption<Duration> RECORD_LEVEL_EXPIRE_TIME =
+ key("record-level.expire-time")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Record level expire time for primary key table,
expiration happens in compaction, "
+ + "there is no strong guarantee to expire
records in time. "
+ + "You must specific
'record-level.time-field' too.");
+
+ public static final ConfigOption<String> RECORD_LEVEL_TIME_FIELD =
+ key("record-level.time-field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Time field for record level expire, it should be
a seconds INT.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -1761,6 +1778,16 @@ public class CoreOptions implements Serializable {
return options.get(DELETION_FORCE_PRODUCE_CHANGELOG);
}
+ @Nullable
+ public Duration recordLevelExpireTime() {
+ return options.get(RECORD_LEVEL_EXPIRE_TIME);
+ }
+
+ @Nullable
+ public String recordLevelTimeField() {
+ return options.get(RECORD_LEVEL_TIME_FIELD);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileReaderFactory.java
new file mode 100644
index 000000000..6fc2b85ba
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileReaderFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.reader.RecordReader;
+
+import java.io.IOException;
+
+/** Factory to read records from file. */
+public interface FileReaderFactory<T> {
+
+ RecordReader<T> createRecordReader(DataFileMeta file) throws IOException;
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index e7d091b47..2a19cbdca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -53,7 +53,7 @@ import java.util.Optional;
import java.util.function.Supplier;
/** Factory to create {@link RecordReader}s for reading {@link KeyValue}
files. */
-public class KeyValueFileReaderFactory {
+public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
private final FileIO fileIO;
private final SchemaManager schemaManager;
@@ -95,6 +95,11 @@ public class KeyValueFileReaderFactory {
this.ignoreDelete =
CoreOptions.fromMap(schema.options()).ignoreDelete();
}
+ @Override
+ public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws
IOException {
+ return createRecordReader(file.schemaId(), file.fileName(),
file.fileSize(), file.level());
+ }
+
public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws
IOException {
if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
new file mode 100644
index 000000000..a49e31fc9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A factory to create {@link RecordReader} expires records by time. */
+public class RecordLevelExpire {
+
+ private final int timeField;
+ private final int expireTime;
+
+ @Nullable
+ public static RecordLevelExpire create(CoreOptions options, RowType
rowType) {
+ Duration expireTime = options.recordLevelExpireTime();
+ if (expireTime == null) {
+ return null;
+ }
+
+ String timeField = options.recordLevelTimeField();
+ if (timeField == null) {
+ throw new IllegalArgumentException(
+ "You should set time field for record-level expire.");
+ }
+
+ // should no project here, record level expire only works in compaction
+ int fieldIndex = rowType.getFieldIndex(timeField);
+ if (fieldIndex == -1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Can not find time field %s for record level
expire.", timeField));
+ }
+
+ DataField field = rowType.getField(timeField);
+ if (!(field.type() instanceof IntType)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Record level time field should be INT type, but
is %s.",
+ field.type()));
+ }
+
+ return new RecordLevelExpire(fieldIndex, (int)
expireTime.getSeconds());
+ }
+
+ public RecordLevelExpire(int timeField, int expireTime) {
+ this.timeField = timeField;
+ this.expireTime = expireTime;
+ }
+
+ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue>
readerFactory) {
+ return file -> wrap(readerFactory.createRecordReader(file));
+ }
+
+ public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
+ int currentTime = (int) (System.currentTimeMillis() / 1000);
+ return reader.filter(
+ kv -> {
+ checkArgument(
+ !kv.value().isNullAt(timeField),
+ "Time field for record-level expire should not be
null.");
+ int recordTime = kv.value().getInt(timeField);
+ return currentTime <= recordTime + expireTime;
+ });
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 866e2d33c..766dccca2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -21,7 +21,7 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
@@ -42,7 +42,7 @@ public class MergeTreeReaders {
public static <T> RecordReader<T> readerForMergeTree(
List<List<SortedRun>> sections,
- KeyValueFileReaderFactory readerFactory,
+ FileReaderFactory<KeyValue> readerFactory,
Comparator<InternalRow> userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionWrapper<T> mergeFunctionWrapper,
@@ -65,7 +65,7 @@ public class MergeTreeReaders {
public static <T> RecordReader<T> readerForSection(
List<SortedRun> section,
- KeyValueFileReaderFactory readerFactory,
+ FileReaderFactory<KeyValue> readerFactory,
Comparator<InternalRow> userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionWrapper<T> mergeFunctionWrapper,
@@ -80,16 +80,10 @@ public class MergeTreeReaders {
}
private static RecordReader<KeyValue> readerForRun(
- SortedRun run, KeyValueFileReaderFactory readerFactory) throws
IOException {
+ SortedRun run, FileReaderFactory<KeyValue> readerFactory) throws
IOException {
List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (DataFileMeta file : run.files()) {
- readers.add(
- () ->
- readerFactory.createRecordReader(
- file.schemaId(),
- file.fileName(),
- file.fileSize(),
- file.level()));
+ readers.add(() -> readerFactory.createRecordReader(file));
}
return ConcatRecordReader.create(readers);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 98553030d..a1aabc4e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.mergetree.MergeSorter;
@@ -51,7 +51,7 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
public ChangelogMergeTreeRewriter(
int maxLevel,
MergeEngine mergeEngine,
- KeyValueFileReaderFactory readerFactory,
+ FileReaderFactory<KeyValue> readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 7283cbb7b..56a13dfc7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
@@ -48,7 +48,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
public FullChangelogMergeTreeCompactRewriter(
int maxLevel,
CoreOptions.MergeEngine mergeEngine,
- KeyValueFileReaderFactory readerFactory,
+ FileReaderFactory<KeyValue> readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index fea64539b..ca7069455 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.LookupLevels;
@@ -58,7 +58,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
int maxLevel,
MergeEngine mergeEngine,
LookupLevels<T> lookupLevels,
- KeyValueFileReaderFactory readerFactory,
+ FileReaderFactory<KeyValue> readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 4cfe00f3f..d399d599d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -22,7 +22,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.mergetree.DropDeleteReader;
@@ -42,7 +42,7 @@ import java.util.List;
/** Default {@link CompactRewriter} for merge trees. */
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
- protected final KeyValueFileReaderFactory readerFactory;
+ protected final FileReaderFactory<KeyValue> readerFactory;
protected final KeyValueFileWriterFactory writerFactory;
protected final Comparator<InternalRow> keyComparator;
@Nullable protected final FieldsComparator userDefinedSeqComparator;
@@ -50,7 +50,7 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
protected final MergeSorter mergeSorter;
public MergeTreeCompactRewriter(
- KeyValueFileReaderFactory readerFactory,
+ FileReaderFactory<KeyValue> readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 8343ba8b8..314019a60 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -35,8 +35,10 @@ import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
@@ -100,6 +102,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final FileIO fileIO;
private final RowType keyType;
private final RowType valueType;
+ @Nullable private final RecordLevelExpire recordLevelExpire;
public KeyValueFileStoreWrite(
FileIO fileIO,
@@ -144,6 +147,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
pathFactory,
extractor,
options);
+ this.recordLevelExpire = RecordLevelExpire.create(options, valueType);
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
@@ -258,8 +262,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
DeletionVector.Factory dvFactory =
DeletionVector.factory(dvMaintainer);
- KeyValueFileReaderFactory readerFactory =
+ FileReaderFactory<KeyValue> readerFactory =
readerFactoryBuilder.build(partition, bucket, dvFactory);
+ if (recordLevelExpire != null) {
+ readerFactory = recordLevelExpire.wrap(readerFactory);
+ }
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType,
ioManager);
@@ -282,7 +289,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
LookupStrategy lookupStrategy = options.lookupStrategy();
LookupLevels.ValueProcessor<?> processor;
LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?>
wrapperFactory;
- KeyValueFileReaderFactory lookupReaderFactory = readerFactory;
+ FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
if (mergeEngine == FIRST_ROW) {
if (options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
@@ -337,7 +344,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private <T> LookupLevels<T> createLookupLevels(
Levels levels,
LookupLevels.ValueProcessor<T> valueProcessor,
- KeyValueFileReaderFactory readerFactory) {
+ FileReaderFactory<KeyValue> readerFactory) {
if (ioManager == null) {
throw new RuntimeException(
"Can not use lookup, there is no temp disk directory to
use.");
@@ -348,9 +355,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
keyComparatorSupplier.get(),
keyType,
valueProcessor,
- file ->
- readerFactory.createRecordReader(
- file.schemaId(), file.fileName(),
file.fileSize(), file.level()),
+ readerFactory::createRecordReader,
() -> ioManager.createChannel().getPathFile(),
new HashLookupStoreFactory(
cacheManager,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
index 4c334e5bf..54152c896 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
@@ -18,11 +18,17 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;
@@ -32,6 +38,8 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
@@ -82,4 +90,43 @@ public abstract class PrimaryKeyTableTestBase {
protected static long utcMills(String timestamp) {
return
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
}
+
+ protected void writeCommit(InternalRow... rows) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ for (InternalRow row : rows) {
+ write.write(row);
+ }
+ writeBuilder.newCommit().commit(write.prepareCommit());
+ write.close();
+ }
+
+ protected void compact(int partition) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ write.compact(BinaryRow.singleColumn(partition), 0, true);
+ writeBuilder.newCommit().commit(write.prepareCommit());
+ write.close();
+ }
+
+ protected List<GenericRow> query() throws Exception {
+ return query(new int[] {0, 1, 2});
+ }
+
+ protected List<GenericRow> query(int[] projection) throws Exception {
+ ReadBuilder readBuilder =
table.newReadBuilder().withProjection(projection);
+ List<GenericRow> rows = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(
+ r -> {
+ GenericRow newR = new
GenericRow(projection.length);
+ for (int i = 0; i < projection.length; i++) {
+ newR.setField(i, r.getInt(i));
+ }
+ rows.add(newR);
+ });
+ return rows;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
new file mode 100644
index 000000000..367a30b66
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class RecordLevelExpireTest extends PrimaryKeyTableTestBase {
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME,
Duration.ofSeconds(1));
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+ return options;
+ }
+
+ @Test
+ public void test() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 2));
+
+ // can be queried
+ assertThat(query())
+ .containsExactlyInAnyOrder(GenericRow.of(1, 1, 1),
GenericRow.of(1, 2, 2));
+
+ int currentSecs = (int) (System.currentTimeMillis() / 1000);
+ writeCommit(GenericRow.of(1, 3, currentSecs));
+ writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60));
+ Thread.sleep(2000);
+
+ // no compaction, can be queried
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 3, currentSecs),
+ GenericRow.of(1, 4, currentSecs + 60 * 60));
+
+ // compact, expired
+ compact(1);
+ assertThat(query()).containsExactlyInAnyOrder(GenericRow.of(1, 4,
currentSecs + 60 * 60));
+ assertThat(query(new int[] {2}))
+ .containsExactlyInAnyOrder(GenericRow.of(currentSecs + 60 *
60));
+ }
+}