This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d5290fc7e [core] Reduce File IO for snapshot read (#3849)
d5290fc7e is described below

commit d5290fc7e7fb8f256d1b3ee026812b9ec4762927
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 30 22:40:49 2024 +0800

    [core] Reduce File IO for snapshot read (#3849)
---
 .../src/main/java/org/apache/paimon/fs/FileIO.java          | 13 +++++++++----
 paimon-core/src/main/java/org/apache/paimon/Snapshot.java   | 10 ++++++----
 .../paimon/table/source/snapshot/FullStartingScanner.java   |  5 ++++-
 .../main/java/org/apache/paimon/utils/SnapshotManager.java  |  8 --------
 .../org/apache/paimon/table/TableFormatReadWriteTest.java   |  4 ++--
 .../apache/paimon/table/TableFormatReadWriteWithPkTest.java |  4 ++--
 .../scala/org/apache/paimon/spark/PaimonHiveTestBase.scala  |  1 +
 .../scala/org/apache/paimon/spark/PaimonSparkTestBase.scala |  1 +
 ...chProcedureTest.scala => FastForwardProcedureTest.scala} |  0
 9 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index cd9022e23..fe2e31b8d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -297,15 +297,17 @@ public interface FileIO extends Serializable {
     /** Read file from {@link #overwriteFileUtf8} file. */
     default Optional<String> readOverwrittenFileUtf8(Path path) throws 
IOException {
         int retryNumber = 0;
-        IOException exception = null;
+        Exception exception = null;
         while (retryNumber++ < 5) {
             try {
+                return Optional.of(readFileUtf8(path));
+            } catch (FileNotFoundException e) {
+                return Optional.empty();
+            } catch (Exception e) {
                 if (!exists(path)) {
                     return Optional.empty();
                 }
 
-                return Optional.of(readFileUtf8(path));
-            } catch (IOException e) {
                 if (e.getClass()
                         .getName()
                         
.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) {
@@ -322,7 +324,10 @@ public interface FileIO extends Serializable {
             }
         }
 
-        throw exception;
+        if (exception instanceof IOException) {
+            throw (IOException) exception;
+        }
+        throw new RuntimeException(exception);
     }
 
     // 
-------------------------------------------------------------------------
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 6c2656d39..6102c321d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -440,8 +440,7 @@ public class Snapshot {
 
     public static Snapshot fromPath(FileIO fileIO, Path path) {
         try {
-            String json = fileIO.readFileUtf8(path);
-            return Snapshot.fromJson(json);
+            return fromPathThrowsException(fileIO, path);
         } catch (IOException e) {
             throw new RuntimeException("Fails to read snapshot from path " + 
path, e);
         }
@@ -450,13 +449,16 @@ public class Snapshot {
     @Nullable
     public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws 
IOException {
         try {
-            String json = fileIO.readFileUtf8(path);
-            return Snapshot.fromJson(json);
+            return fromPathThrowsException(fileIO, path);
         } catch (FileNotFoundException e) {
             return null;
         }
     }
 
+    private static Snapshot fromPathThrowsException(FileIO fileIO, Path path) 
throws IOException {
+        return Snapshot.fromJson(fileIO.readFileUtf8(path));
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index c177be7ea..fc9b49d2d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -42,7 +42,10 @@ public class FullStartingScanner extends 
AbstractStartingScanner {
 
     @Override
     public Result scan(SnapshotReader snapshotReader) {
-        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+        if (startingSnapshotId == null) {
+            // try to get first snapshot again
+            startingSnapshotId = snapshotManager.latestSnapshotId();
+        }
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
             return new NoSnapshot();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index cf0b44b5b..48627957c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -611,10 +611,6 @@ public class SnapshotManager implements Serializable {
 
     private @Nullable Long findLatest(Path dir, String prefix, Function<Long, 
Path> file)
             throws IOException {
-        if (!fileIO.exists(dir)) {
-            return null;
-        }
-
         Long snapshotId = readHint(LATEST, dir);
         if (snapshotId != null && snapshotId > 0) {
             long nextSnapshot = snapshotId + 1;
@@ -628,10 +624,6 @@ public class SnapshotManager implements Serializable {
 
     private @Nullable Long findEarliest(Path dir, String prefix, 
Function<Long, Path> file)
             throws IOException {
-        if (!fileIO.exists(dir)) {
-            return null;
-        }
-
         Long snapshotId = readHint(EARLIEST, dir);
         // null and it is the earliest only it exists
         if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
index 070923249..d303dd862 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
@@ -34,8 +34,8 @@ import java.util.List;
 public class TableFormatReadWriteTest extends TableTestBase {
 
     private Table createTable(String format) throws Exception {
-        catalog.createTable(identifier(), schema(format), true);
-        return catalog.getTable(identifier());
+        catalog.createTable(identifier(format), schema(format), true);
+        return catalog.getTable(identifier(format));
     }
 
     private Schema schema(String format) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
index 9fc3e656d..8adfcdc89 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
@@ -34,8 +34,8 @@ import java.util.List;
 public class TableFormatReadWriteWithPkTest extends TableTestBase {
 
     private Table createTable(String format) throws Exception {
-        catalog.createTable(identifier(), schema(format), true);
-        return catalog.getTable(identifier());
+        catalog.createTable(identifier(format), schema(format), true);
+        return catalog.getTable(identifier(format));
     }
 
     private Schema schema(String format) {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index 842147615..ccd705e26 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.Snapshot
 import org.apache.paimon.hive.TestHiveMetastore
 
 import org.apache.hadoop.conf.Configuration
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 983dd037f..19e711a60 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.Snapshot
 import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, 
Identifier}
 import org.apache.paimon.options.{CatalogOptions, Options}
 import org.apache.paimon.spark.catalog.Catalogs
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala
similarity index 100%
rename from 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
rename to 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala

Reply via email to