This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eb6695f57 [core] Fix SST files written using SstFileWriter don't work
with TtlDB (#3292)
eb6695f57 is described below
commit eb6695f57730c5d86e617a7e7b710c4413ac2bd9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 13:37:56 2024 +0800
[core] Fix SST files written using SstFileWriter don't work with TtlDB
(#3292)
---
.../java/org/apache/paimon/lookup/BulkLoader.java | 19 ++++++++++++++++++
.../crosspartition/GlobalIndexAssignerTest.java | 23 ++++++++++++++++++++++
2 files changed, 42 insertions(+)
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
index 75e6a90ea..0bcdbbb5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
@@ -25,6 +25,7 @@ import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;
+import org.rocksdb.TtlDB;
import java.io.File;
import java.util.ArrayList;
@@ -39,8 +40,10 @@ public class BulkLoader {
private final ColumnFamilyHandle columnFamily;
private final String path;
private final RocksDB db;
+ private final boolean isTtlEnabled;
private final Options options;
private final List<String> files = new ArrayList<>();
+ private final int currentTimeSeconds;
private SstFileWriter writer = null;
private int sstIndex = 0;
@@ -48,9 +51,11 @@ public class BulkLoader {
public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle
columnFamily, String path) {
this.db = db;
+ this.isTtlEnabled = db instanceof TtlDB;
this.options = options;
this.columnFamily = columnFamily;
this.path = path;
+ this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
}
public void write(byte[] key, byte[] value) throws WriteException {
@@ -62,6 +67,10 @@ public class BulkLoader {
files.add(path);
}
+ if (isTtlEnabled) {
+ value = appendTimestamp(value);
+ }
+
try {
writer.put(key, value);
} catch (RocksDBException e) {
@@ -80,6 +89,16 @@ public class BulkLoader {
}
}
+ private byte[] appendTimestamp(byte[] value) {
+ byte[] newValue = new byte[value.length + 4];
+ System.arraycopy(value, 0, newValue, 0, value.length);
+ newValue[value.length] = (byte) (currentTimeSeconds & 0xff);
+ newValue[value.length + 1] = (byte) ((currentTimeSeconds >> 8) & 0xff);
+ newValue[value.length + 2] = (byte) ((currentTimeSeconds >> 16) &
0xff);
+ newValue[value.length + 3] = (byte) ((currentTimeSeconds >> 24) &
0xff);
+ return newValue;
+ }
+
public void finish() {
try {
if (writer != null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
index 347ede6e4..65d916439 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
@@ -243,4 +243,27 @@ public class GlobalIndexAssignerTest extends TableTestBase
{
output.clear();
assigner.close();
}
+
+ @Test
+ public void testBootstrapWithTTL() throws Exception {
+ // enableTtl is true
+ GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE,
true);
+ List<List<Integer>> output = new ArrayList<>();
+ assigner.open(
+ 0,
+ ioManager(),
+ 2,
+ 0,
+ (row, bucket) ->
+ output.add(
+ Arrays.asList(
+ row.getInt(0), row.getInt(1),
row.getInt(2), bucket)));
+
+ // assigner.bootstrapKey can trigger the problem
+ assigner.bootstrapKey(GenericRow.of(1, 1, 1));
+ assigner.processInput(GenericRow.of(1, 1, 1));
+ assigner.endBoostrap(true);
+
+ assertThat(output).containsExactlyInAnyOrder(Arrays.asList(1, 1, 1,
1));
+ }
}