This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 24d690a6c3 [test] refactor commitSpanshot in RestCatalog and fix ut
(#5302)
24d690a6c3 is described below
commit 24d690a6c363cb84794104c1346d4081259918a5
Author: Xiaohu <[email protected]>
AuthorDate: Tue Mar 18 13:56:27 2025 +0800
[test] refactor commitSpanshot in RestCatalog and fix ut (#5302)
---
.../org/apache/paimon/rest/RESTCatalogServer.java | 102 +++++++++++++++++----
.../apache/paimon/rest/RESTCatalogTestBase.java | 17 +++-
2 files changed, 98 insertions(+), 21 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index e285a5facd..9654f2647c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -37,6 +37,7 @@ import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
+import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
@@ -562,16 +563,8 @@ public class RESTCatalogServer {
if (!tableMetadataStore.containsKey(identifier.getFullName())) {
throw new Catalog.TableNotExistException(identifier);
}
- FileStoreTable table = getFileTable(identifier);
- RenamingSnapshotCommit commit =
- new RenamingSnapshotCommit(table.snapshotManager(),
Lock.empty());
- String branchName = identifier.getBranchName();
- if (branchName == null) {
- branchName = "main";
- }
boolean success =
- commit.commit(requestBody.getSnapshot(), branchName,
Collections.emptyList());
- commitSnapshot(identifier, requestBody.getSnapshot(), null);
+ commitSnapshot(identifier, requestBody.getSnapshot(),
requestBody.getStatistics());
CommitTableResponse response = new CommitTableResponse(success);
return mockResponse(response, 200);
}
@@ -1262,7 +1255,7 @@ public class RESTCatalogServer {
}
private boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot, List<Partition>
statistics)
+ Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
throws Catalog.TableNotExistException {
FileStoreTable table = getFileTable(identifier);
RenamingSnapshotCommit commit =
@@ -1273,6 +1266,7 @@ public class RESTCatalogServer {
}
try {
boolean success = commit.commit(snapshot, branchName,
Collections.emptyList());
+ // update snapshot and stats
tableSnapshotStore.compute(
identifier.getFullName(),
(k, old) -> {
@@ -1281,12 +1275,12 @@ public class RESTCatalogServer {
long fileCount = 0;
long lastFileCreationTime = 0;
if (statistics != null) {
- for (Partition partition : statistics) {
- recordCount += partition.recordCount();
- fileSizeInBytes += partition.fileSizeInBytes();
- fileCount += partition.fileCount();
- if (partition.lastFileCreationTime() >
lastFileCreationTime) {
- lastFileCreationTime =
partition.lastFileCreationTime();
+ for (PartitionStatistics stats : statistics) {
+ recordCount += stats.recordCount();
+ fileSizeInBytes += stats.fileSizeInBytes();
+ fileCount += stats.fileCount();
+ if (stats.lastFileCreationTime() >
lastFileCreationTime) {
+ lastFileCreationTime =
stats.lastFileCreationTime();
}
}
}
@@ -1305,6 +1299,82 @@ public class RESTCatalogServer {
lastFileCreationTime,
fileSizeInBytes);
});
+ // upsert partitions stats
+ if (!tablePartitionsStore.containsKey(identifier.getFullName())) {
+ if (statistics != null) {
+ List<Partition> newPartitions =
+ statistics.stream()
+ .map(
+ stats ->
+ new Partition(
+ stats.spec(),
+
stats.recordCount(),
+
stats.fileSizeInBytes(),
+ stats.fileCount(),
+
stats.lastFileCreationTime(),
+ false))
+ .collect(Collectors.toList());
+ tablePartitionsStore.put(identifier.getFullName(),
newPartitions);
+ }
+ } else {
+ tablePartitionsStore.compute(
+ identifier.getFullName(),
+ (k, oldPartitions) -> {
+ if (oldPartitions == null || statistics == null) {
+ return oldPartitions;
+ }
+ Map<Map<String, String>, PartitionStatistics>
partitionStatisticsMap =
+ statistics.stream()
+ .collect(
+ Collectors.toMap(
+
PartitionStatistics::spec,
+ y -> y,
+ (a, b) -> a));
+ List<Partition> updatedPartitions =
+ oldPartitions.stream()
+ .map(
+ oldPartition -> {
+ PartitionStatistics
stats =
+
partitionStatisticsMap.get(
+
oldPartition.spec());
+ if (stats == null) {
+ return
oldPartition; // 如果没有新的统计信息,保持原样
+ }
+ return new Partition(
+
oldPartition.spec(),
+
oldPartition.recordCount()
+ +
stats.recordCount(),
+
oldPartition.fileSizeInBytes()
+ +
stats.fileSizeInBytes(),
+
oldPartition.fileCount()
+ +
stats.fileCount(),
+ Math.max(
+
oldPartition
+
.lastFileCreationTime(),
+ stats
+
.lastFileCreationTime()),
+
oldPartition.done());
+ })
+ .collect(Collectors.toList());
+ return updatedPartitions;
+ });
+ }
+ // clean up partitions
+ tablePartitionsStore
+ .entrySet()
+ .removeIf(
+ entry -> {
+ List<Partition> partitions = entry.getValue();
+ if (partitions == null) {
+ return true;
+ }
+ partitions.removeIf(
+ partition ->
+ partition.fileSizeInBytes() <= 0
+ &&
partition.fileCount() <= 0
+ &&
partition.recordCount() <= 0);
+ return partitions.isEmpty();
+ });
return success;
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
index e7d08e2c0b..1a6b80ffe3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
@@ -597,11 +597,18 @@ public abstract class RESTCatalogTestBase extends
CatalogTestBase {
Identifier branchIdentifier = new Identifier("test_db", "test_table",
branchName);
assertThrows(
Catalog.TableNotExistException.class, () ->
restCatalog.listPartitions(identifier));
-
- createTable(
+ restCatalog.createDatabase(identifier.getDatabaseName(), true);
+ restCatalog.createTable(
identifier,
- ImmutableMap.of(METASTORE_PARTITIONED_TABLE.key(), "" + true),
- Lists.newArrayList("col1"));
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "col1", DataTypes.INT()),
+ new DataField(1, "dt", DataTypes.STRING())),
+ Arrays.asList("dt"),
+ Collections.emptyList(),
+ ImmutableMap.of(METASTORE_PARTITIONED_TABLE.key(), ""
+ true),
+ ""),
+ true);
List<Partition> result = catalog.listPartitions(identifier);
assertEquals(0, result.size());
List<Map<String, String>> partitionSpecs =
@@ -1007,7 +1014,7 @@ public abstract class RESTCatalogTestBase extends
CatalogTestBase {
@Override
protected boolean supportPartitions() {
// TODO support this
- return false;
+ return true;
}
@Override