This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2b5089cf5d [core] ensure that record-expire takes effect when full
compaction (#5255)
2b5089cf5d is described below
commit 2b5089cf5df2e36e6dc18420e0162e408dc3e0f4
Author: LsomeYeah <[email protected]>
AuthorDate: Fri Mar 14 11:41:45 2025 +0800
[core] ensure that record-expire takes effect when full compaction (#5255)
---
.../org/apache/paimon/io/RecordLevelExpire.java | 87 +++++++++-
.../paimon/mergetree/compact/CompactStrategy.java | 29 +++-
.../mergetree/compact/MergeTreeCompactManager.java | 14 +-
.../mergetree/compact/MergeTreeCompactTask.java | 39 ++++-
.../paimon/operation/KeyValueFileStoreWrite.java | 5 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 6 +-
.../compact/MergeTreeCompactManagerTest.java | 9 +-
.../apache/paimon/table/RecordLevelExpireTest.java | 177 +++++++++++++++++++++
8 files changed, 342 insertions(+), 24 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 05c70dcbe1..e8cae827ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -22,6 +22,12 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.KeyValueFieldsExtractor;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStatsEvolution;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
@@ -30,20 +36,33 @@ import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {
+ private static final Logger LOG =
LoggerFactory.getLogger(RecordLevelExpire.class);
+
private final int expireTime;
private final Function<InternalRow, Optional<Integer>> fieldGetter;
+ private final ConcurrentMap<Long, TableSchema> tableSchemas;
+ private final TableSchema schema;
+ private final SchemaManager schemaManager;
+ private final SimpleStatsEvolutions fieldValueStatsConverters;
+
@Nullable
- public static RecordLevelExpire create(CoreOptions options, RowType
rowType) {
+ public static RecordLevelExpire create(
+ CoreOptions options, TableSchema schema, SchemaManager
schemaManager) {
Duration expireTime = options.recordLevelExpireTime();
if (expireTime == null) {
return null;
@@ -56,6 +75,7 @@ public class RecordLevelExpire {
}
// should no project here, record level expire only works in compaction
+ RowType rowType = schema.logicalRowType();
int fieldIndex = rowType.getFieldIndex(timeFieldName);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
@@ -66,13 +86,69 @@ public class RecordLevelExpire {
DataType dataType = rowType.getField(timeFieldName).type();
Function<InternalRow, Optional<Integer>> fieldGetter =
createFieldGetter(dataType, fieldIndex);
- return new RecordLevelExpire((int) expireTime.getSeconds(),
fieldGetter);
+
+ LOG.info(
+ "Create RecordExpire. expireTime is {}s,timeField is {}",
+ (int) expireTime.getSeconds(),
+ timeFieldName);
+ return new RecordLevelExpire(
+ (int) expireTime.getSeconds(), fieldGetter, schema,
schemaManager);
}
private RecordLevelExpire(
- int expireTime, Function<InternalRow, Optional<Integer>>
fieldGetter) {
+ int expireTime,
+ Function<InternalRow, Optional<Integer>> fieldGetter,
+ TableSchema schema,
+ SchemaManager schemaManager) {
this.expireTime = expireTime;
this.fieldGetter = fieldGetter;
+
+ this.tableSchemas = new ConcurrentHashMap<>();
+ this.schema = schema;
+ this.schemaManager = schemaManager;
+
+ KeyValueFieldsExtractor extractor =
+ PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
+
+ fieldValueStatsConverters =
+ new SimpleStatsEvolutions(
+ sid -> extractor.valueFields(scanTableSchema(sid)),
schema.id());
+ }
+
+ public boolean isExpireFile(DataFileMeta file) {
+ InternalRow minValues = file.valueStats().minValues();
+
+ if (file.schemaId() != schema.id() || file.valueStatsCols() != null) {
+ // In the following cases, can not read minValues with field index
directly
+ //
+ // 1. if the table had suffered schema evolution, read minValues
with new field index
+ // may cause exception.
+ // 2. if metadata.stats-dense-store = true, minValues may not
contain all data fields
+ // which may cause exception when reading with origin field index
+ SimpleStatsEvolution.Result result =
+ fieldValueStatsConverters
+ .getOrCreate(file.schemaId())
+ .evolution(file.valueStats(), file.rowCount(),
file.valueStatsCols());
+ minValues = result.minValues();
+ }
+
+ int currentTime = (int) (System.currentTimeMillis() / 1000);
+ Optional<Integer> minTime = fieldGetter.apply(minValues);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "expire time is {}, currentTime is {}, file min time for
time field is {}. "
+ + "file name is {}, file level is {}, file schema
id is {}, file valueStatsCols is {}",
+ expireTime,
+ currentTime,
+ minTime.isPresent() ? minTime.get() : "empty",
+ file.fileName(),
+ file.level(),
+ file.schemaId(),
+ file.valueStatsCols());
+ }
+
+ return minTime.map(minValue -> currentTime - expireTime >
minValue).orElse(false);
}
public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue>
readerFactory) {
@@ -130,4 +206,9 @@ public class RecordLevelExpire {
return fieldGetter;
}
+
+ private TableSchema scanTableSchema(long id) {
+ return tableSchemas.computeIfAbsent(
+ id, key -> key == schema.id() ? schema :
schemaManager.schema(id));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index 90471d1caf..23c09d3dc3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -19,8 +19,13 @@
package org.apache.paimon.mergetree.compact;
import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.mergetree.LevelSortedRun;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -39,11 +44,29 @@ public interface CompactStrategy {
Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
/** Pick a compaction unit consisting of all existing files. */
- static Optional<CompactUnit> pickFullCompaction(int numLevels,
List<LevelSortedRun> runs) {
+ static Optional<CompactUnit> pickFullCompaction(
+ int numLevels,
+ List<LevelSortedRun> runs,
+ @Nullable RecordLevelExpire recordLevelExpire) {
int maxLevel = numLevels - 1;
- if (runs.isEmpty() || (runs.size() == 1 && runs.get(0).level() ==
maxLevel)) {
- // no sorted run or only 1 sorted run on the max level, no need to
compact
+ if (runs.isEmpty()) {
+ // no sorted run, no need to compact
return Optional.empty();
+ } else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
+ if (recordLevelExpire == null) {
+ // only 1 sorted run on the max level and don't check
record-expire, no need to
+ // compact
+ return Optional.empty();
+ }
+
+ // pick the files which has expired records
+ List<DataFileMeta> filesContainExpireRecords = new ArrayList<>();
+ for (DataFileMeta file : runs.get(0).run().files()) {
+ if (recordLevelExpire.isExpireFile(file)) {
+ filesContainExpireRecords.add(file);
+ }
+ }
+ return Optional.of(CompactUnit.fromFiles(maxLevel,
filesContainExpireRecords));
} else {
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f8d9a44e4e..a1b1954d5b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -27,6 +27,7 @@ import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -65,6 +66,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private final boolean lazyGenDeletionFile;
private final boolean needLookup;
+ @Nullable private final RecordLevelExpire recordLevelExpire;
+
public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
@@ -76,7 +79,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Nullable CompactionMetrics.Reporter metricsReporter,
@Nullable DeletionVectorsMaintainer dvMaintainer,
boolean lazyGenDeletionFile,
- boolean needLookup) {
+ boolean needLookup,
+ @Nullable RecordLevelExpire recordLevelExpire) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
@@ -87,6 +91,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.metricsReporter = metricsReporter;
this.dvMaintainer = dvMaintainer;
this.lazyGenDeletionFile = lazyGenDeletionFile;
+ this.recordLevelExpire = recordLevelExpire;
this.needLookup = needLookup;
MetricUtils.safeCall(this::reportMetrics, LOG);
@@ -128,7 +133,9 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
"Trigger forced full compaction. Picking from the
following runs\n{}",
runs);
}
- optionalUnit =
CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs);
+ optionalUnit =
+ CompactStrategy.pickFullCompaction(
+ levels.numberOfLevels(), runs, recordLevelExpire);
} else {
if (taskFuture != null) {
return;
@@ -201,7 +208,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
dropDelete,
levels.maxLevel(),
metricsReporter,
- compactDfSupplier);
+ compactDfSupplier,
+ recordLevelExpire);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index a1b64072d8..890302f4be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -24,6 +24,7 @@ import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -52,6 +53,8 @@ public class MergeTreeCompactTask extends CompactTask {
// metric
private int upgradeFilesNum;
+ @Nullable private final RecordLevelExpire recordLevelExpire;
+
public MergeTreeCompactTask(
Comparator<InternalRow> keyComparator,
long minFileSize,
@@ -60,7 +63,8 @@ public class MergeTreeCompactTask extends CompactTask {
boolean dropDelete,
int maxLevel,
@Nullable CompactionMetrics.Reporter metricsReporter,
- Supplier<CompactDeletionFile> compactDfSupplier) {
+ Supplier<CompactDeletionFile> compactDfSupplier,
+ @Nullable RecordLevelExpire recordLevelExpire) {
super(metricsReporter);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
@@ -69,6 +73,7 @@ public class MergeTreeCompactTask extends CompactTask {
this.partitioned = new IntervalPartition(unit.files(),
keyComparator).partition();
this.dropDelete = dropDelete;
this.maxLevel = maxLevel;
+ this.recordLevelExpire = recordLevelExpire;
this.upgradeFilesNum = 0;
}
@@ -117,19 +122,26 @@ public class MergeTreeCompactTask extends CompactTask {
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws
Exception {
if (file.level() == outputLevel) {
+ if (isContainExpiredRecords(file)) {
+ // if the large file in maxLevel has expired records, we need
to rewrite it
+ rewriteFile(file, toUpdate);
+ }
return;
}
if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d ==
0).orElse(false)) {
- CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
- toUpdate.merge(upgradeResult);
- upgradeFilesNum++;
+ if (isContainExpiredRecords(file)) {
+ // if the file which could be directly upgraded has expired
records, we need to
+ // rewrite it
+ rewriteFile(file, toUpdate);
+ } else {
+ CompactResult upgradeResult = rewriter.upgrade(outputLevel,
file);
+ toUpdate.merge(upgradeResult);
+ upgradeFilesNum++;
+ }
} else {
// files with delete records should not be upgraded directly to
max level
- List<List<SortedRun>> candidate = new ArrayList<>();
- candidate.add(new ArrayList<>());
- candidate.get(0).add(SortedRun.fromSingle(file));
- rewriteImpl(candidate, toUpdate);
+ rewriteFile(file, toUpdate);
}
}
@@ -158,4 +170,15 @@ public class MergeTreeCompactTask extends CompactTask {
toUpdate.merge(rewriteResult);
candidate.clear();
}
+
+ private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws
Exception {
+ List<List<SortedRun>> candidate = new ArrayList<>();
+ candidate.add(new ArrayList<>());
+ candidate.get(0).add(SortedRun.fromSingle(file));
+ rewriteImpl(candidate, toUpdate);
+ }
+
+ private boolean isContainExpiredRecords(DataFileMeta file) {
+ return recordLevelExpire != null &&
recordLevelExpire.isExpireFile(file);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 97865e763f..c439112064 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -158,7 +158,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
pathFactory,
extractor,
options);
- this.recordLevelExpire = RecordLevelExpire.create(options, valueType);
+ this.recordLevelExpire = RecordLevelExpire.create(options, schema,
schemaManager);
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
@@ -281,7 +281,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
: compactionMetrics.createReporter(partition,
bucket),
dvMaintainer,
options.prepareCommitWaitCompaction(),
- options.needLookup());
+ options.needLookup(),
+ recordLevelExpire);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 01a9326aed..1207daa86c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -541,7 +541,8 @@ public abstract class MergeTreeTestBase {
null,
null,
false,
- options.needLookup());
+ options.needLookup(),
+ null);
}
static class MockFailResultCompactionManager extends
MergeTreeCompactManager {
@@ -564,7 +565,8 @@ public abstract class MergeTreeTestBase {
null,
null,
false,
- false);
+ false,
+ null);
}
protected CompactResult obtainCompactResult()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index e7fa767b4b..4240555977 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -207,7 +207,8 @@ public class MergeTreeCompactManagerTest {
null,
null,
false,
- true);
+ true,
+ null);
MergeTreeCompactManager defaultManager =
new MergeTreeCompactManager(
@@ -221,7 +222,8 @@ public class MergeTreeCompactManagerTest {
null,
null,
false,
- false);
+ false,
+ null);
assertThat(lookupManager.compactNotCompleted()).isTrue();
assertThat(defaultManager.compactNotCompleted()).isFalse();
@@ -256,7 +258,8 @@ public class MergeTreeCompactManagerTest {
null,
null,
false,
- false);
+ false,
+ null);
manager.triggerCompaction(false);
manager.getCompactionResult(true);
List<LevelMinMax> outputs =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
index bce57c2752..cb5c517973 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireTest.java
@@ -26,8 +26,12 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;
@@ -35,6 +39,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -114,4 +122,173 @@ class RecordLevelExpireTest extends
PrimaryKeyTableTestBase {
GenericRow.of(1, 4, currentSecs + 60 * 60),
GenericRow.of(1, 5, currentSecs + 60 * 60));
}
+
+ @Test
+ public void testIsExpireFile() throws Exception {
+ CoreOptions coreOptions = table.coreOptions();
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ table = table.copy(dynamicOptions);
+
+ // common case
+ int currentSecs = (int) (System.currentTimeMillis() / 1000);
+ writeCommit(
+ GenericRow.of(1, 1, currentSecs + 60 * 60),
+ GenericRow.of(1, 2, currentSecs + 30 * 60),
+ GenericRow.of(1, 3, currentSecs - 60 * 60),
+ GenericRow.of(1, 4, currentSecs - 30 * 60));
+
+ writeCommit(
+ GenericRow.of(1, 5, currentSecs + 60 * 60),
+ GenericRow.of(1, 6, currentSecs + 30 * 60),
+ GenericRow.of(1, 7, currentSecs + 20 * 60),
+ GenericRow.of(1, 8, currentSecs + 10 * 60));
+
+ RecordLevelExpire recordLevelExpire =
+ RecordLevelExpire.create(coreOptions, table.schema(),
table.schemaManager());
+ List<DataSplit> splits1 =
table.newSnapshotReader().read().dataSplits();
+ assertThat(splits1.size()).isEqualTo(1);
+ List<DataFileMeta> files1 = splits1.get(0).dataFiles();
+ assertThat(files1.size()).isEqualTo(2);
+ assertThat(recordLevelExpire.isExpireFile(files1.get(0))).isTrue();
+ assertThat(recordLevelExpire.isExpireFile(files1.get(1))).isFalse();
+
+ // schema evolution
+ table.schemaManager()
+ .commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ "col0",
+ DataTypes.INT(),
+ null,
+ SchemaChange.Move.after("col0",
"pk"))));
+ refreshTable();
+
+ recordLevelExpire =
+ RecordLevelExpire.create(coreOptions, table.schema(),
table.schemaManager());
+ List<DataSplit> splits2 =
table.newSnapshotReader().read().dataSplits();
+ List<DataFileMeta> files2 = splits2.get(0).dataFiles();
+ assertThat(recordLevelExpire.isExpireFile(files2.get(0))).isTrue();
+ assertThat(recordLevelExpire.isExpireFile(files2.get(1))).isFalse();
+
+ // metadata.stats-dense-store = true
+ dynamicOptions.put(CoreOptions.METADATA_STATS_DENSE_STORE.key(),
"true");
+ dynamicOptions.put(CoreOptions.METADATA_STATS_MODE.key(), "none");
+ dynamicOptions.put("fields.col1.stats-mode", "full");
+ table = table.copy(dynamicOptions);
+
+ writeCommit(
+ GenericRow.of(1, 9, 9, currentSecs + 60 * 60),
+ GenericRow.of(1, 10, 10, currentSecs + 30 * 60));
+ writeCommit(
+ GenericRow.of(1, 11, 11, currentSecs + 60 * 60),
+ GenericRow.of(1, 12, 12, currentSecs - 30 * 60));
+
+ recordLevelExpire =
+ RecordLevelExpire.create(coreOptions, table.schema(),
table.schemaManager());
+ List<DataSplit> splits3 =
table.newSnapshotReader().read().dataSplits();
+ List<DataFileMeta> files3 = splits3.get(0).dataFiles();
+ assertThat(recordLevelExpire.isExpireFile(files3.get(0))).isTrue();
+ assertThat(recordLevelExpire.isExpireFile(files3.get(1))).isFalse();
+ assertThat(recordLevelExpire.isExpireFile(files3.get(2))).isFalse();
+ assertThat(recordLevelExpire.isExpireFile(files3.get(3))).isTrue();
+
+ // schema evolution again, change the valueCols
+ table.schemaManager()
+ .commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ "col2",
+ DataTypes.INT(),
+ null,
+ SchemaChange.Move.after("col2",
"pk"))));
+ refreshTable();
+
+ // new files has no stats for record-level.time-field
+ dynamicOptions.put("fields.col1.stats-mode", "none");
+ dynamicOptions.put("fields.col2.stats-mode", "full");
+ table = table.copy(dynamicOptions);
+
+ writeCommit(
+ GenericRow.of(1, 13, 13, 13, currentSecs + 60 * 60),
+ GenericRow.of(1, 14, 14, 14, currentSecs + 30 * 60));
+ writeCommit(
+ GenericRow.of(1, 15, 15, 15, currentSecs + 60 * 60),
+ GenericRow.of(1, 16, 16, 16, currentSecs - 30 * 60));
+
+ recordLevelExpire =
+ RecordLevelExpire.create(coreOptions, table.schema(),
table.schemaManager());
+ List<DataSplit> splits4 =
table.newSnapshotReader().read().dataSplits();
+ List<DataFileMeta> files4 = splits4.get(0).dataFiles();
+ // old files with record-level.time-field stats
+ assertThat(recordLevelExpire.isExpireFile(files4.get(0))).isTrue();
+ assertThat(recordLevelExpire.isExpireFile(files4.get(1))).isFalse();
+ assertThat(recordLevelExpire.isExpireFile(files4.get(2))).isFalse();
+ assertThat(recordLevelExpire.isExpireFile(files4.get(3))).isTrue();
+ // new files without record-level.time-field stats, cannot expire
records
+ assertThat(recordLevelExpire.isExpireFile(files4.get(4))).isFalse();
+ assertThat(recordLevelExpire.isExpireFile(files4.get(5))).isFalse();
+ }
+
+ @Test
+ public void testTotallyExpire() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1500 B");
+ table = table.copy(map);
+
+ int currentSecs = (int) (System.currentTimeMillis() / 1000);
+ // if seconds is too short, this test might file
+ int seconds = 5;
+
+ // large file A. It has no delete records and expired records, will be
upgraded to maxLevel
+ // without rewriting when full compaction
+ writeCommit(
+ GenericRow.of(1, 1, currentSecs + 60 * 60),
+ GenericRow.of(1, 2, currentSecs + seconds));
+ compact(1);
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, currentSecs + 60 * 60),
+ GenericRow.of(1, 2, currentSecs + seconds));
+
+ // large file B. It has no delete records but has expired records
+ writeCommit(
+ GenericRow.of(1, 3, currentSecs + 60 * 60),
+ GenericRow.of(1, 4, currentSecs - 60 * 60));
+ // no full compaction, expired records can be queried
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, currentSecs + 60 * 60),
+ GenericRow.of(1, 2, currentSecs + seconds),
+ GenericRow.of(1, 3, currentSecs + 60 * 60),
+ GenericRow.of(1, 4, currentSecs - 60 * 60));
+ compact(1);
+ List<DataSplit> splits1 =
table.newSnapshotReader().read().dataSplits();
+ assertThat(splits1.size()).isEqualTo(1);
+ assertThat(splits1.get(0).dataFiles().size()).isEqualTo(2);
+ // full compaction, expired records will be removed
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, currentSecs + 60 * 60),
+ GenericRow.of(1, 2, currentSecs + seconds),
+ GenericRow.of(1, 3, currentSecs + 60 * 60));
+
+ // ensure (1, 2, currentSecs + seconds) out of date
+ Thread.sleep(seconds * 1000 + 2000);
+ compact(1);
+ assertThat(query())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, currentSecs + 60 * 60),
+ GenericRow.of(1, 3, currentSecs + 60 * 60));
+ }
+
+ private void refreshTable() throws Catalog.TableNotExistException {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+
+ table = (FileStoreTable) catalog.getTable(identifier);
+ }
}