This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 18529b887 [core] fix totalRecordCount error when compact in primary
key table (#2009)
18529b887 is described below
commit 18529b8870aafb170186db77f64be31f91c925e5
Author: HZY <[email protected]>
AuthorDate: Thu Sep 14 12:04:16 2023 +0800
[core] fix totalRecordCount error when compact in primary key table (#2009)
---
.../src/main/java/org/apache/paimon/Snapshot.java | 15 ++++
.../paimon/operation/FileStoreCommitImpl.java | 8 +-
.../table/IncrementalTimeStampTableTest.java | 96 ++++++++++++++++++++++
3 files changed, 117 insertions(+), 2 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 15ebf8e1d..0bbeda7d1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -20,6 +20,7 @@ package org.apache.paimon;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
@@ -389,6 +390,20 @@ public class Snapshot {
return manifestEntries.stream().mapToLong(manifest ->
manifest.file().rowCount()).sum();
}
+ public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream()
+ .filter(manifestEntry ->
FileKind.ADD.equals(manifestEntry.kind()))
+ .mapToLong(manifest -> manifest.file().rowCount())
+ .sum();
+ }
+
+ public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream()
+ .filter(manifestEntry ->
FileKind.DELETE.equals(manifestEntry.kind()))
+ .mapToLong(manifest -> manifest.file().rowCount())
+ .sum();
+ }
+
public String toJson() {
return JsonSerdeUtil.toJson(this);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 59dee5358..45d943ee4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -640,8 +640,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
partitionType));
previousChangesListName = manifestList.write(newMetas);
+ // the added records subtract the deleted records from
+ long deltaRecordCount =
+ Snapshot.recordCountAdd(tableFiles) -
Snapshot.recordCountDelete(tableFiles);
+ long totalRecordCount = previousTotalRecordCount +
deltaRecordCount;
+
// write new changes into manifest files
- long deltaRecordCount = Snapshot.recordCount(tableFiles);
List<ManifestFileMeta> newChangesManifests =
manifestFile.write(tableFiles);
newMetas.addAll(newChangesManifests);
newChangesListName = manifestList.write(newChangesManifests);
@@ -672,7 +676,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
commitKind,
System.currentTimeMillis(),
logOffsets,
- previousTotalRecordCount + deltaRecordCount,
+ totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index dd348e5bc..629a74a44 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -132,6 +133,101 @@ public class IncrementalTimeStampTableTest extends
TableTestBase {
GenericRow.of(2, 2, 1));
}
+ @Test
+ public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws
Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+
+ // snapshot 1: append
+ write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1),
GenericRow.of(1, 3, 1));
+ Snapshot snapshot1 = snapshotManager.snapshot(1);
+
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
+ assertThat(snapshot1.totalRecordCount()).isEqualTo(3L);
+ assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L);
+ // snapshot 2: append
+ write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2),
GenericRow.of(1, 4, 1));
+ Snapshot snapshot2 = snapshotManager.snapshot(2);
+
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
+ assertThat(snapshot2.totalRecordCount()).isEqualTo(6L);
+ assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L);
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+ Snapshot snapshot3 = snapshotManager.snapshot(3);
+
assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
+ assertThat(snapshot3.totalRecordCount()).isEqualTo(4L);
+ assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
+ System.out.println(snapshot3);
+ }
+
+ @Test
+ public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws
Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ SnapshotManager snapshotManager = new
SnapshotManager(LocalFileIO.create(), tablePath);
+
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1),
+ GenericRow.of(2, 2, 1));
+ Snapshot snapshot1 = snapshotManager.snapshot(1);
+
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
+ assertThat(snapshot1.totalRecordCount()).isEqualTo(5L);
+ assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L);
+ // snapshot 2: append
+ write(
+ table,
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 2, 2),
+ GenericRow.of(2, 3, 1));
+ Snapshot snapshot2 = snapshotManager.snapshot(2);
+
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
+ assertThat(snapshot2.totalRecordCount()).isEqualTo(10L);
+ assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L);
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+
+ Snapshot snapshot3 = snapshotManager.snapshot(3);
+
+
assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
+ assertThat(snapshot3.totalRecordCount()).isEqualTo(8L);
+ assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
+ // snapshot 4: compact
+ compact(table, row(2), 0);
+
+ Snapshot snapshot4 = snapshotManager.snapshot(4);
+
+
assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount());
+ assertThat(snapshot4.totalRecordCount()).isEqualTo(7L);
+ assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L);
+ }
+
@Test
public void testAppendTable() throws Exception {
Identifier identifier = identifier("T");