This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5245c1af [rest] Add commit statistics when commit to REST Server 
(#5152)
1d5245c1af is described below

commit 1d5245c1af57aee371092a8e66ce0d469eba61f7
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Feb 25 19:20:44 2025 +0800

    [rest] Add commit statistics when commit to REST Server (#5152)
---
 .../org/apache/paimon/catalog/CatalogUtils.java    |  8 +--
 .../paimon/catalog/RenamingSnapshotCommit.java     |  5 +-
 .../org/apache/paimon/catalog/SnapshotCommit.java  |  4 +-
 .../org/apache/paimon/manifest/PartitionEntry.java | 11 ++++
 .../paimon/operation/FileStoreCommitImpl.java      | 61 +++++++++++++---------
 .../org/apache/paimon/partition/Partition.java     |  5 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  5 +-
 .../paimon/rest/RESTSnapshotCommitFactory.java     |  7 ++-
 .../paimon/rest/requests/CommitTableRequest.java   | 16 +++++-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  4 +-
 paimon-open-api/rest-catalog-open-api.yaml         |  4 ++
 11 files changed, 89 insertions(+), 41 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index de7ec83755..45c50e2bb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -147,13 +147,7 @@ public class CatalogUtils {
                 table.newReadBuilder().newScan().listPartitionEntries();
         List<Partition> partitions = new ArrayList<>(partitionEntries.size());
         for (PartitionEntry entry : partitionEntries) {
-            partitions.add(
-                    new Partition(
-                            computer.generatePartValues(entry.partition()),
-                            entry.recordCount(),
-                            entry.fileSizeInBytes(),
-                            entry.fileCount(),
-                            entry.lastFileCreationTime()));
+            partitions.add(entry.toPartition(computer));
         }
         return partitions;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
index cc7e4d06b5..4525838c67 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
@@ -23,10 +23,12 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 
@@ -49,7 +51,8 @@ public class RenamingSnapshotCommit implements SnapshotCommit 
{
     }
 
     @Override
-    public boolean commit(Snapshot snapshot, String branch) throws Exception {
+    public boolean commit(Snapshot snapshot, String branch, List<Partition> 
statistics)
+            throws Exception {
         Path newSnapshotPath =
                 snapshotManager.branch().equals(branch)
                         ? snapshotManager.snapshotPath(snapshot.id())
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
index 1f472464c8..984556735e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SnapshotCommit.java
@@ -19,14 +19,16 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.io.Serializable;
+import java.util.List;
 
 /** Interface to commit snapshot atomically. */
 public interface SnapshotCommit extends AutoCloseable {
 
-    boolean commit(Snapshot snapshot, String branch) throws Exception;
+    boolean commit(Snapshot snapshot, String branch, List<Partition> 
statistics) throws Exception;
 
     /** Factory to create {@link SnapshotCommit}. */
     interface Factory extends Serializable {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index 1aa562444d..92c03f2c15 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -21,7 +21,9 @@ package org.apache.paimon.manifest;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -83,6 +85,15 @@ public class PartitionEntry {
                 Math.max(lastFileCreationTime, entry.lastFileCreationTime));
     }
 
+    public Partition toPartition(InternalRowPartitionComputer computer) {
+        return new Partition(
+                computer.generatePartValues(partition),
+                recordCount,
+                fileSizeInBytes,
+                fileCount,
+                lastFileCreationTime);
+    }
+
     public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
         return fromDataFile(entry.partition(), entry.kind(), entry.file());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4a59de8dac..82ce3ad5c8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -36,10 +36,12 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -55,6 +57,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
@@ -78,6 +81,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.manifest.ManifestEntry.recordCount;
@@ -137,6 +141,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final BucketMode bucketMode;
     private final long commitTimeout;
     private final int commitMaxRetries;
+    private final InternalRowPartitionComputer partitionComputer;
 
     private boolean ignoreEmptyCommit;
     private CommitMetrics commitMetrics;
@@ -198,6 +203,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         this.commitCallbacks = commitCallbacks;
         this.commitMaxRetries = commitMaxRetries;
         this.commitTimeout = commitTimeout;
+        this.partitionComputer =
+                new InternalRowPartitionComputer(
+                        options.partitionDefaultName(),
+                        partitionType,
+                        partitionType.getFieldNames().toArray(new String[0]),
+                        options.legacyPartitionName());
 
         this.ignoreEmptyCommit = true;
         this.commitMetrics = null;
@@ -492,7 +503,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 attempts +=
                         tryCommit(
                                 compactTableFiles,
-                                Collections.emptyList(),
+                                emptyList(),
                                 compactDvIndexFiles,
                                 committable.identifier(),
                                 committable.watermark(),
@@ -508,9 +519,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             if (this.commitMetrics != null) {
                 reportCommit(
                         appendTableFiles,
-                        Collections.emptyList(),
+                        emptyList(),
                         compactTableFiles,
-                        Collections.emptyList(),
+                        emptyList(),
                         commitDuration,
                         generatedSnapshot,
                         attempts);
@@ -550,23 +561,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         tryOverwrite(
-                partitionFilter,
-                Collections.emptyList(),
-                Collections.emptyList(),
-                commitIdentifier,
-                null,
-                new HashMap<>());
+                partitionFilter, emptyList(), emptyList(), commitIdentifier, 
null, new HashMap<>());
     }
 
     @Override
     public void truncateTable(long commitIdentifier) {
-        tryOverwrite(
-                null,
-                Collections.emptyList(),
-                Collections.emptyList(),
-                commitIdentifier,
-                null,
-                new HashMap<>());
+        tryOverwrite(null, emptyList(), emptyList(), commitIdentifier, null, 
new HashMap<>());
     }
 
     @Override
@@ -597,9 +597,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     public void commitStatistics(Statistics stats, long commitIdentifier) {
         String statsFileName = statsFileHandler.writeStats(stats);
         tryCommit(
-                Collections.emptyList(),
-                Collections.emptyList(),
-                Collections.emptyList(),
+                emptyList(),
+                emptyList(),
+                emptyList(),
                 commitIdentifier,
                 null,
                 Collections.emptyMap(),
@@ -809,7 +809,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
         return tryCommit(
                 changesWithOverwrite,
-                Collections.emptyList(),
+                emptyList(),
                 indexChangesWithOverwrite,
                 identifier,
                 watermark,
@@ -887,6 +887,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         Snapshot newSnapshot;
         String baseManifestList = null;
         String deltaManifestList = null;
+        List<PartitionEntry> deltaStatistics = null;
         String changelogManifestList = null;
         String oldIndexManifest = null;
         String indexManifest = null;
@@ -933,6 +934,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
             boolean rewriteIndexManifest = true;
             if (retryResult != null) {
+                deltaStatistics = retryResult.deltaStatistics;
                 deltaManifestList = retryResult.deltaManifestList;
                 changelogManifestList = retryResult.changelogManifestList;
                 if (Objects.equals(oldIndexManifest, 
retryResult.oldIndexManifest)) {
@@ -944,6 +946,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 }
             } else {
                 // write new delta files into manifest files
+                deltaStatistics = new 
ArrayList<>(PartitionEntry.merge(deltaFiles));
                 deltaManifestList = 
manifestList.write(manifestFile.write(deltaFiles));
 
                 // write changelog into manifest files
@@ -1009,7 +1012,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     e);
         }
 
-        if (commitSnapshotImpl(newSnapshot)) {
+        if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(
                         String.format(
@@ -1031,6 +1034,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         newSnapshotId, commitUser, identifier, 
commitKind.name(), commitTime));
         cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
         return new RetryResult(
+                deltaStatistics,
                 deltaManifestList,
                 changelogManifestList,
                 oldIndexManifest,
@@ -1115,7 +1119,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         String baseManifestList = manifestList.write(mergeAfterManifests);
-        String deltaManifestList = manifestList.write(Collections.emptyList());
+        String deltaManifestList = manifestList.write(emptyList());
 
         // prepare snapshot file
         Snapshot newSnapshot =
@@ -1137,7 +1141,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         latestSnapshot.watermark(),
                         latestSnapshot.statistics());
 
-        if (!commitSnapshotImpl(newSnapshot)) {
+        if (!commitSnapshotImpl(newSnapshot, emptyList())) {
             return new ManifestCompactResult(
                     baseManifestList, deltaManifestList, mergeBeforeManifests, 
mergeAfterManifests);
         } else {
@@ -1145,9 +1149,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
-    private boolean commitSnapshotImpl(Snapshot newSnapshot) {
+    private boolean commitSnapshotImpl(Snapshot newSnapshot, 
List<PartitionEntry> deltaStatistics) {
         try {
-            return snapshotCommit.commit(newSnapshot, branchName);
+            List<Partition> statistics = new 
ArrayList<>(deltaStatistics.size());
+            for (PartitionEntry entry : deltaStatistics) {
+                statistics.add(entry.toPartition(partitionComputer));
+            }
+            return snapshotCommit.commit(newSnapshot, branchName, statistics);
         } catch (Throwable e) {
             // exception when performing the atomic rename,
             // we cannot clean up because we can't determine the success
@@ -1517,6 +1525,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     private class RetryResult implements CommitResult {
 
+        private final List<PartitionEntry> deltaStatistics;
         private final String deltaManifestList;
         private final String changelogManifestList;
 
@@ -1527,12 +1536,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         private final List<SimpleFileEntry> baseDataFiles;
 
         private RetryResult(
+                List<PartitionEntry> deltaStatistics,
                 String deltaManifestList,
                 String changelogManifestList,
                 String oldIndexManifest,
                 String newIndexManifest,
                 Snapshot latestSnapshot,
                 List<SimpleFileEntry> baseDataFiles) {
+            this.deltaStatistics = deltaStatistics;
             this.deltaManifestList = deltaManifestList;
             this.changelogManifestList = changelogManifestList;
             this.oldIndexManifest = oldIndexManifest;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
index b13082fb44..ef52528daf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
@@ -29,7 +29,10 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
 
-/** Entry representing a partition. */
+/**
+ * Statistics of a partition, fields inside may be negative, indicating that 
some data has been
+ * removed.
+ */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @Public
 public class Partition implements Serializable {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 341379e523..1ac077cd9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -340,8 +340,9 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots {
         return Optional.of(response.getSnapshot());
     }
 
-    public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
-        CommitTableRequest request = new CommitTableRequest(identifier, 
snapshot);
+    public boolean commitSnapshot(
+            Identifier identifier, Snapshot snapshot, List<Partition> 
statistics) {
+        CommitTableRequest request = new CommitTableRequest(identifier, 
snapshot, statistics);
         CommitTableResponse response =
                 client.post(
                         
resourcePaths.commitTable(identifier.getDatabaseName()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
index 1a027d4163..87e0506036 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java
@@ -21,8 +21,11 @@ package org.apache.paimon.rest;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.List;
+
 /** Factory to create {@link SnapshotCommit} for REST Catalog. */
 public class RESTSnapshotCommitFactory implements SnapshotCommit.Factory {
 
@@ -39,11 +42,11 @@ public class RESTSnapshotCommitFactory implements 
SnapshotCommit.Factory {
         RESTCatalog catalog = loader.load();
         return new SnapshotCommit() {
             @Override
-            public boolean commit(Snapshot snapshot, String branch) {
+            public boolean commit(Snapshot snapshot, String branch, 
List<Partition> statistics) {
                 Identifier newIdentifier =
                         new Identifier(
                                 identifier.getDatabaseName(), 
identifier.getTableName(), branch);
-                return catalog.commitSnapshot(newIdentifier, snapshot);
+                return catalog.commitSnapshot(newIdentifier, snapshot, 
statistics);
             }
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
index de8474c1d3..cd7135691b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.rest.requests;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.RESTRequest;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -27,12 +28,15 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGet
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.List;
+
 /** Request for committing snapshot to table. */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class CommitTableRequest implements RESTRequest {
 
     private static final String FIELD_IDENTIFIER = "identifier";
     private static final String FIELD_SNAPSHOT = "snapshot";
+    private static final String FIELD_STATISTICS = "statistics";
 
     @JsonProperty(FIELD_IDENTIFIER)
     private final Identifier identifier;
@@ -40,12 +44,17 @@ public class CommitTableRequest implements RESTRequest {
     @JsonProperty(FIELD_SNAPSHOT)
     private final Snapshot snapshot;
 
+    @JsonProperty(FIELD_STATISTICS)
+    private final List<Partition> statistics;
+
     @JsonCreator
     public CommitTableRequest(
             @JsonProperty(FIELD_IDENTIFIER) Identifier identifier,
-            @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot) {
+            @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot,
+            @JsonProperty(FIELD_STATISTICS) List<Partition> statistics) {
         this.identifier = identifier;
         this.snapshot = snapshot;
+        this.statistics = statistics;
     }
 
     @JsonGetter(FIELD_IDENTIFIER)
@@ -57,4 +66,9 @@ public class CommitTableRequest implements RESTRequest {
     public Snapshot getSnapshot() {
         return snapshot;
     }
+
+    @JsonGetter(FIELD_STATISTICS)
+    public List<Partition> getStatistics() {
+        return statistics;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 04f514ef2e..a7938fd00f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -70,6 +70,7 @@ import okhttp3.mockwebserver.RecordedRequest;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -406,7 +407,8 @@ public class RESTCatalogServer {
         if (branchName == null) {
             branchName = "main";
         }
-        boolean success = commit.commit(requestBody.getSnapshot(), branchName);
+        boolean success =
+                commit.commit(requestBody.getSnapshot(), branchName, 
Collections.emptyList());
         CommitTableResponse response = new CommitTableResponse(success);
         return mockResponse(response, 200);
     }
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index 4b36bcb3fd..ef136c2359 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -1186,6 +1186,10 @@ components:
           $ref: '#/components/schemas/Identifier'
         snapshot:
           $ref: '#/components/schemas/Snapshot'
+        statistics:
+          type: array
+          items:
+            $ref: '#/components/schemas/Partition'
     Snapshot:
       type: object
       properties:

Reply via email to