This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eb6695f57 [core] Fix SST files written using SstFileWriter don't work 
with TtlDB (#3292)
eb6695f57 is described below

commit eb6695f57730c5d86e617a7e7b710c4413ac2bd9
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 13:37:56 2024 +0800

    [core] Fix SST files written using SstFileWriter don't work with TtlDB 
(#3292)
---
 .../java/org/apache/paimon/lookup/BulkLoader.java  | 19 ++++++++++++++++++
 .../crosspartition/GlobalIndexAssignerTest.java    | 23 ++++++++++++++++++++++
 2 files changed, 42 insertions(+)

diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
index 75e6a90ea..0bcdbbb5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
@@ -25,6 +25,7 @@ import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.SstFileWriter;
+import org.rocksdb.TtlDB;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -39,8 +40,10 @@ public class BulkLoader {
     private final ColumnFamilyHandle columnFamily;
     private final String path;
     private final RocksDB db;
+    private final boolean isTtlEnabled;
     private final Options options;
     private final List<String> files = new ArrayList<>();
+    private final int currentTimeSeconds;
 
     private SstFileWriter writer = null;
     private int sstIndex = 0;
@@ -48,9 +51,11 @@ public class BulkLoader {
 
     public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle 
columnFamily, String path) {
         this.db = db;
+        this.isTtlEnabled = db instanceof TtlDB;
         this.options = options;
         this.columnFamily = columnFamily;
         this.path = path;
+        this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
     }
 
     public void write(byte[] key, byte[] value) throws WriteException {
@@ -62,6 +67,10 @@ public class BulkLoader {
                 files.add(path);
             }
 
+            if (isTtlEnabled) {
+                value = appendTimestamp(value);
+            }
+
             try {
                 writer.put(key, value);
             } catch (RocksDBException e) {
@@ -80,6 +89,16 @@ public class BulkLoader {
         }
     }
 
+    private byte[] appendTimestamp(byte[] value) {
+        byte[] newValue = new byte[value.length + 4];
+        System.arraycopy(value, 0, newValue, 0, value.length);
+        newValue[value.length] = (byte) (currentTimeSeconds & 0xff);
+        newValue[value.length + 1] = (byte) ((currentTimeSeconds >> 8) & 0xff);
+        newValue[value.length + 2] = (byte) ((currentTimeSeconds >> 16) & 
0xff);
+        newValue[value.length + 3] = (byte) ((currentTimeSeconds >> 24) & 
0xff);
+        return newValue;
+    }
+
     public void finish() {
         try {
             if (writer != null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
index 347ede6e4..65d916439 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java
@@ -243,4 +243,27 @@ public class GlobalIndexAssignerTest extends TableTestBase 
{
         output.clear();
         assigner.close();
     }
+
+    @Test
+    public void testBootstrapWithTTL() throws Exception {
+        // enableTtl is true
+        GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE, 
true);
+        List<List<Integer>> output = new ArrayList<>();
+        assigner.open(
+                0,
+                ioManager(),
+                2,
+                0,
+                (row, bucket) ->
+                        output.add(
+                                Arrays.asList(
+                                        row.getInt(0), row.getInt(1), 
row.getInt(2), bucket)));
+
+        // assigner.bootstrapKey can trigger the problem
+        assigner.bootstrapKey(GenericRow.of(1, 1, 1));
+        assigner.processInput(GenericRow.of(1, 1, 1));
+        assigner.endBoostrap(true);
+
+        assertThat(output).containsExactlyInAnyOrder(Arrays.asList(1, 1, 1, 
1));
+    }
 }

Reply via email to