This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 18529b887 [core] fix totalRecordCount error when compact in primary 
key table (#2009)
18529b887 is described below

commit 18529b8870aafb170186db77f64be31f91c925e5
Author: HZY <[email protected]>
AuthorDate: Thu Sep 14 12:04:16 2023 +0800

    [core] fix totalRecordCount error when compact in primary key table (#2009)
---
 .../src/main/java/org/apache/paimon/Snapshot.java  | 15 ++++
 .../paimon/operation/FileStoreCommitImpl.java      |  8 +-
 .../table/IncrementalTimeStampTableTest.java       | 96 ++++++++++++++++++++++
 3 files changed, 117 insertions(+), 2 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 15ebf8e1d..0bbeda7d1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -20,6 +20,7 @@ package org.apache.paimon;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
@@ -389,6 +390,20 @@ public class Snapshot {
         return manifestEntries.stream().mapToLong(manifest -> 
manifest.file().rowCount()).sum();
     }
 
+    public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
+        return manifestEntries.stream()
+                .filter(manifestEntry -> 
FileKind.ADD.equals(manifestEntry.kind()))
+                .mapToLong(manifest -> manifest.file().rowCount())
+                .sum();
+    }
+
+    public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
+        return manifestEntries.stream()
+                .filter(manifestEntry -> 
FileKind.DELETE.equals(manifestEntry.kind()))
+                .mapToLong(manifest -> manifest.file().rowCount())
+                .sum();
+    }
+
     public String toJson() {
         return JsonSerdeUtil.toJson(this);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 59dee5358..45d943ee4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -640,8 +640,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                             partitionType));
             previousChangesListName = manifestList.write(newMetas);
 
+            // the added records subtract the deleted records from
+            long deltaRecordCount =
+                    Snapshot.recordCountAdd(tableFiles) - 
Snapshot.recordCountDelete(tableFiles);
+            long totalRecordCount = previousTotalRecordCount + 
deltaRecordCount;
+
             // write new changes into manifest files
-            long deltaRecordCount = Snapshot.recordCount(tableFiles);
             List<ManifestFileMeta> newChangesManifests = 
manifestFile.write(tableFiles);
             newMetas.addAll(newChangesManifests);
             newChangesListName = manifestList.write(newChangesManifests);
@@ -672,7 +676,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             commitKind,
                             System.currentTimeMillis(),
                             logOffsets,
-                            previousTotalRecordCount + deltaRecordCount,
+                            totalRecordCount,
                             deltaRecordCount,
                             Snapshot.recordCount(changelogFiles),
                             currentWatermark);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index dd348e5bc..629a74a44 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -132,6 +133,101 @@ public class IncrementalTimeStampTableTest extends 
TableTestBase {
                         GenericRow.of(2, 2, 1));
     }
 
+    @Test
+    public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws 
Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+
+        // snapshot 1: append
+        write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), 
GenericRow.of(1, 3, 1));
+        Snapshot snapshot1 = snapshotManager.snapshot(1);
+        
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
+        assertThat(snapshot1.totalRecordCount()).isEqualTo(3L);
+        assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L);
+        // snapshot 2: append
+        write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), 
GenericRow.of(1, 4, 1));
+        Snapshot snapshot2 = snapshotManager.snapshot(2);
+        
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
+        assertThat(snapshot2.totalRecordCount()).isEqualTo(6L);
+        assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L);
+        // snapshot 3: compact
+        compact(table, row(1), 0);
+        Snapshot snapshot3 = snapshotManager.snapshot(3);
+        
assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
+        assertThat(snapshot3.totalRecordCount()).isEqualTo(4L);
+        assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
+        System.out.println(snapshot3);
+    }
+
+    @Test
+    public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws 
Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1),
+                GenericRow.of(2, 2, 1));
+        Snapshot snapshot1 = snapshotManager.snapshot(1);
+        
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
+        assertThat(snapshot1.totalRecordCount()).isEqualTo(5L);
+        assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L);
+        // snapshot 2: append
+        write(
+                table,
+                GenericRow.of(1, 1, 2),
+                GenericRow.of(1, 2, 2),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(2, 2, 2),
+                GenericRow.of(2, 3, 1));
+        Snapshot snapshot2 = snapshotManager.snapshot(2);
+        
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
+        assertThat(snapshot2.totalRecordCount()).isEqualTo(10L);
+        assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L);
+        // snapshot 3: compact
+        compact(table, row(1), 0);
+
+        Snapshot snapshot3 = snapshotManager.snapshot(3);
+
+        
assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
+        assertThat(snapshot3.totalRecordCount()).isEqualTo(8L);
+        assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
+        // snapshot 4: compact
+        compact(table, row(2), 0);
+
+        Snapshot snapshot4 = snapshotManager.snapshot(4);
+
+        
assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount());
+        assertThat(snapshot4.totalRecordCount()).isEqualTo(7L);
+        assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L);
+    }
+
     @Test
     public void testAppendTable() throws Exception {
         Identifier identifier = identifier("T");

Reply via email to