This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 55c038f87 [core] Use binary search to optimize 
SnapshotManager#earlierThanTimeMills (#2805)
55c038f87 is described below

commit 55c038f87f3d2d5c123fade53826079154a97a11
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 22 13:30:52 2024 +0800

    [core] Use binary search to optimize SnapshotManager#earlierThanTimeMills 
(#2805)
---
 .../org/apache/paimon/utils/SnapshotManager.java   | 29 ++++---
 .../apache/paimon/utils/SnapshotManagerTest.java   | 89 +++++++++++++++++-----
 2 files changed, 89 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index e614af7c8..b330fc303 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -181,23 +181,30 @@ public class SnapshotManager implements Serializable {
     }
 
     /**
-     * Returns a snapshot earlier than the timestamp mills. A non-existent 
snapshot may be returned
-     * if all snapshots are later than the timestamp mills.
+     * Returns the latest snapshot earlier than the timestamp mills. A 
non-existent snapshot may be
+     * returned if all snapshots are equal to or later than the timestamp 
mills.
      */
     public @Nullable Long earlierThanTimeMills(long timestampMills) {
         Long earliest = earliestSnapshotId();
         Long latest = latestSnapshotId();
+
         if (earliest == null || latest == null) {
             return null;
         }
 
-        for (long i = latest; i >= earliest; i--) {
-            long commitTime = snapshot(i).timeMillis();
-            if (commitTime < timestampMills) {
-                return i;
+        if (snapshot(earliest).timeMillis() >= timestampMills) {
+            return earliest - 1;
+        }
+
+        while (earliest < latest) {
+            long mid = (earliest + latest + 1) / 2;
+            if (snapshot(mid).timeMillis() < timestampMills) {
+                earliest = mid;
+            } else {
+                latest = mid - 1;
             }
         }
-        return earliest - 1;
+        return earliest;
     }
 
     /**
@@ -214,7 +221,7 @@ public class SnapshotManager implements Serializable {
         if (snapshot(earliest).timeMillis() > timestampMills) {
             return null;
         }
-        Snapshot finnalSnapshot = null;
+        Snapshot finalSnapshot = null;
         while (earliest <= latest) {
             long mid = earliest + (latest - earliest) / 2; // Avoid overflow
             Snapshot snapshot = snapshot(mid);
@@ -223,13 +230,13 @@ public class SnapshotManager implements Serializable {
                 latest = mid - 1; // Search in the left half
             } else if (commitTime < timestampMills) {
                 earliest = mid + 1; // Search in the right half
-                finnalSnapshot = snapshot;
+                finalSnapshot = snapshot;
             } else {
-                finnalSnapshot = snapshot; // Found the exact match
+                finalSnapshot = snapshot; // Found the exact match
                 break;
             }
         }
-        return finnalSnapshot;
+        return finalSnapshot;
     }
 
     public long snapshotCount() throws IOException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 9778ba3be..7294e3810 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -28,8 +28,12 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -49,6 +53,52 @@ public class SnapshotManagerTest {
         }
     }
 
+    @Test
+    public void testEarlierThanTimeMillis() throws IOException {
+        long base = System.currentTimeMillis();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        int numSnapshots = random.nextInt(1, 20);
+        Set<Long> set = new HashSet<>();
+        while (set.size() < numSnapshots) {
+            set.add(base + random.nextLong(0, 1_000_000));
+        }
+        List<Long> millis = set.stream().sorted().collect(Collectors.toList());
+
+        FileIO localFileIO = LocalFileIO.create();
+        SnapshotManager snapshotManager =
+                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+        int firstSnapshotId = random.nextInt(1, 100);
+        for (int i = 0; i < numSnapshots; i++) {
+            Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, 
millis.get(i));
+            localFileIO.writeFileUtf8(
+                    snapshotManager.snapshotPath(firstSnapshotId + i), 
snapshot.toJson());
+        }
+
+        for (int tries = 0; tries < 10; tries++) {
+            long time;
+            if (random.nextBoolean()) {
+                // pick a random time
+                time = base + random.nextLong(0, 1_000_000);
+            } else {
+                // pick a random time equal to one of the snapshots
+                time = millis.get(random.nextInt(numSnapshots));
+            }
+            Long actual = snapshotManager.earlierThanTimeMills(time);
+
+            if (millis.get(numSnapshots - 1) < time) {
+                assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 
1);
+            } else {
+                for (int i = 0; i < numSnapshots; i++) {
+                    if (millis.get(i) >= time) {
+                        assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
     @Test
     public void testEarlierOrEqualTimeMills() throws IOException {
         long millis = 1684726826L;
@@ -57,24 +107,7 @@ public class SnapshotManagerTest {
                 new SnapshotManager(localFileIO, new Path(tempDir.toString()));
         // create 10 snapshots
         for (long i = 0; i < 10; i++) {
-            Snapshot snapshot =
-                    new Snapshot(
-                            i,
-                            0L,
-                            null,
-                            null,
-                            null,
-                            null,
-                            null,
-                            0L,
-                            Snapshot.CommitKind.APPEND,
-                            millis + i * 1000,
-                            null,
-                            null,
-                            null,
-                            null,
-                            null,
-                            null);
+            Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
             localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i), 
snapshot.toJson());
         }
         // smaller than the second snapshot return the first snapshot
@@ -88,6 +121,26 @@ public class SnapshotManagerTest {
                 .isEqualTo(millis + 1000);
     }
 
+    private Snapshot createSnapshotWithMillis(long id, long millis) {
+        return new Snapshot(
+                id,
+                0L,
+                null,
+                null,
+                null,
+                null,
+                null,
+                0L,
+                Snapshot.CommitKind.APPEND,
+                millis,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null);
+    }
+
     @Test
     public void testTraversalSnapshotsFromLatestSafely() throws IOException, 
InterruptedException {
         FileIO localFileIO = LocalFileIO.create();

Reply via email to