This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d5290fc7e [core] Reduce File IO for snapshot read (#3849)
d5290fc7e is described below
commit d5290fc7e7fb8f256d1b3ee026812b9ec4762927
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 30 22:40:49 2024 +0800
[core] Reduce File IO for snapshot read (#3849)
---
.../src/main/java/org/apache/paimon/fs/FileIO.java | 13 +++++++++----
paimon-core/src/main/java/org/apache/paimon/Snapshot.java | 10 ++++++----
.../paimon/table/source/snapshot/FullStartingScanner.java | 5 ++++-
.../main/java/org/apache/paimon/utils/SnapshotManager.java | 8 --------
.../org/apache/paimon/table/TableFormatReadWriteTest.java | 4 ++--
.../apache/paimon/table/TableFormatReadWriteWithPkTest.java | 4 ++--
.../scala/org/apache/paimon/spark/PaimonHiveTestBase.scala | 1 +
.../scala/org/apache/paimon/spark/PaimonSparkTestBase.scala | 1 +
...chProcedureTest.scala => FastForwardProcedureTest.scala} | 0
9 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index cd9022e23..fe2e31b8d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -297,15 +297,17 @@ public interface FileIO extends Serializable {
/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws
IOException {
int retryNumber = 0;
- IOException exception = null;
+ Exception exception = null;
while (retryNumber++ < 5) {
try {
+ return Optional.of(readFileUtf8(path));
+ } catch (FileNotFoundException e) {
+ return Optional.empty();
+ } catch (Exception e) {
if (!exists(path)) {
return Optional.empty();
}
- return Optional.of(readFileUtf8(path));
- } catch (IOException e) {
if (e.getClass()
.getName()
.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) {
@@ -322,7 +324,10 @@ public interface FileIO extends Serializable {
}
}
- throw exception;
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ throw new RuntimeException(exception);
}
//
-------------------------------------------------------------------------
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 6c2656d39..6102c321d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -440,8 +440,7 @@ public class Snapshot {
public static Snapshot fromPath(FileIO fileIO, Path path) {
try {
- String json = fileIO.readFileUtf8(path);
- return Snapshot.fromJson(json);
+ return fromPathThrowsException(fileIO, path);
} catch (IOException e) {
throw new RuntimeException("Fails to read snapshot from path " +
path, e);
}
@@ -450,13 +449,16 @@ public class Snapshot {
@Nullable
public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws
IOException {
try {
- String json = fileIO.readFileUtf8(path);
- return Snapshot.fromJson(json);
+ return fromPathThrowsException(fileIO, path);
} catch (FileNotFoundException e) {
return null;
}
}
+ private static Snapshot fromPathThrowsException(FileIO fileIO, Path path)
throws IOException {
+ return Snapshot.fromJson(fileIO.readFileUtf8(path));
+ }
+
@Override
public int hashCode() {
return Objects.hash(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index c177be7ea..fc9b49d2d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -42,7 +42,10 @@ public class FullStartingScanner extends
AbstractStartingScanner {
@Override
public Result scan(SnapshotReader snapshotReader) {
- Long startingSnapshotId = snapshotManager.latestSnapshotId();
+ if (startingSnapshotId == null) {
+ // try to get first snapshot again
+ startingSnapshotId = snapshotManager.latestSnapshotId();
+ }
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
return new NoSnapshot();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index cf0b44b5b..48627957c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -611,10 +611,6 @@ public class SnapshotManager implements Serializable {
private @Nullable Long findLatest(Path dir, String prefix, Function<Long,
Path> file)
throws IOException {
- if (!fileIO.exists(dir)) {
- return null;
- }
-
Long snapshotId = readHint(LATEST, dir);
if (snapshotId != null && snapshotId > 0) {
long nextSnapshot = snapshotId + 1;
@@ -628,10 +624,6 @@ public class SnapshotManager implements Serializable {
private @Nullable Long findEarliest(Path dir, String prefix,
Function<Long, Path> file)
throws IOException {
- if (!fileIO.exists(dir)) {
- return null;
- }
-
Long snapshotId = readHint(EARLIEST, dir);
// null and it is the earliest only it exists
if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
index 070923249..d303dd862 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java
@@ -34,8 +34,8 @@ import java.util.List;
public class TableFormatReadWriteTest extends TableTestBase {
private Table createTable(String format) throws Exception {
- catalog.createTable(identifier(), schema(format), true);
- return catalog.getTable(identifier());
+ catalog.createTable(identifier(format), schema(format), true);
+ return catalog.getTable(identifier(format));
}
private Schema schema(String format) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
index 9fc3e656d..8adfcdc89 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java
@@ -34,8 +34,8 @@ import java.util.List;
public class TableFormatReadWriteWithPkTest extends TableTestBase {
private Table createTable(String format) throws Exception {
- catalog.createTable(identifier(), schema(format), true);
- return catalog.getTable(identifier());
+ catalog.createTable(identifier(format), schema(format), true);
+ return catalog.getTable(identifier(format));
}
private Schema schema(String format) {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index 842147615..ccd705e26 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.Snapshot
import org.apache.paimon.hive.TestHiveMetastore
import org.apache.hadoop.conf.Configuration
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 983dd037f..19e711a60 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.Snapshot
import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory,
Identifier}
import org.apache.paimon.options.{CatalogOptions, Options}
import org.apache.paimon.spark.catalog.Catalogs
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala
similarity index 100%
rename from
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
rename to
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala