This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a5220d  feat(spec): Add CommitKind in snapshot (#64)
5a5220d is described below

commit 5a5220d2737630f1f43a99a23f26ff97119b2894
Author: WenjunMin <[email protected]>
AuthorDate: Sun Sep 1 17:02:09 2024 +0800

    feat(spec): Add CommitKind in snapshot (#64)
---
 crates/paimon/src/spec/snapshot.rs                 | 293 +++++++--------------
 .../fixtures/snapshot/snapshot-v3-none-field.json  |  16 ++
 .../tests/fixtures/snapshot/snapshot-v3.json       |  17 ++
 3 files changed, 128 insertions(+), 198 deletions(-)

diff --git a/crates/paimon/src/spec/snapshot.rs 
b/crates/paimon/src/spec/snapshot.rs
index 44ffed2..28dc92d 100644
--- a/crates/paimon/src/spec/snapshot.rs
+++ b/crates/paimon/src/spec/snapshot.rs
@@ -19,6 +19,22 @@ use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 use typed_builder::TypedBuilder;
 
+/// Type of changes in this snapshot.
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+pub enum CommitKind {
+    /// Changes flushed from the mem table.
+    APPEND,
+
+    /// Changes by compacting existing data files.
+    COMPACT,
+
+    /// Changes that clear up the whole partition and then add new records.
+    OVERWRITE,
+
+    /// Collect statistics.
+    ANALYZE,
+}
+
 /// Snapshot for paimon.
 ///
 /// Impl Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/Snapshot.java#L68>.
@@ -35,9 +51,11 @@ pub struct Snapshot {
     delta_manifest_list: String,
     /// a manifest list recording all changelog produced in this snapshot
     #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
     changelog_manifest_list: Option<String>,
     /// a manifest recording all index files of this table
     #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
     index_manifest: Option<String>,
     /// user who committed this snapshot
     commit_user: String,
@@ -49,10 +67,12 @@ pub struct Snapshot {
     /// If snapshot A has a smaller commitIdentifier than snapshot B, then 
snapshot A must be
     /// committed before snapshot B, and thus snapshot A must contain older 
records than snapshot B.
     commit_identifier: i64,
+    commit_kind: CommitKind,
     /// timestamp of this snapshot
     time_millis: u64,
     /// log offsets of all changes occurred in this snapshot
     #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
     log_offsets: Option<HashMap<i32, i64>>,
     /// record count of all changes occurred in this snapshot
     #[builder(default = None)]
@@ -62,12 +82,15 @@ pub struct Snapshot {
     delta_record_count: Option<i64>,
     /// record count of all changelog produced in this snapshot
     #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
     changelog_record_count: Option<i64>,
     /// watermark for input records
     #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
     watermark: Option<i64>,
     /// stats file name for statistics of this table
     #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
     statistics: Option<String>,
 }
 
@@ -172,213 +195,87 @@ impl Snapshot {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use pretty_assertions::assert_eq;
     use serde_json;
+    use std::env::current_dir;
 
-    #[test]
-    fn test_snapshot_creation() {
-        let snapshot = Snapshot::builder()
-            .version(3)
-            .id(1)
-            .schema_id(0)
-            
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
-            
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
-            .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
-            .commit_identifier(9223372036854775807)
-            .time_millis(1721287833568)
-            .build();
-
-        assert_eq!(snapshot.version(), 3);
-        assert_eq!(snapshot.id(), 1);
-        assert_eq!(snapshot.schema_id(), 0);
-        assert_eq!(
-            snapshot.base_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
-        );
-        assert_eq!(
-            snapshot.delta_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
-        );
-        assert_eq!(
-            snapshot.commit_user(),
-            "cf568e07-05ad-4943-b4bd-37461bc58729"
-        );
-        assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
-        assert_eq!(snapshot.time_millis(), 1721287833568);
-        assert!(snapshot.changelog_manifest_list().is_none());
-        assert!(snapshot.index_manifest().is_none());
-        assert!(snapshot.log_offsets().is_none());
-        assert!(snapshot.total_record_count().is_none());
-        assert!(snapshot.delta_record_count().is_none());
-        assert!(snapshot.changelog_record_count().is_none());
-        assert!(snapshot.watermark().is_none());
-        assert!(snapshot.statistics().is_none());
+    fn load_fixture(name: &str) -> String {
+        let path = current_dir()
+            .unwrap_or_else(|err| panic!("current_dir must exist: {err}"))
+            .join(format!("tests/fixtures/snapshot/{name}.json"));
+        let bytes = std::fs::read(&path)
+            .unwrap_or_else(|err| panic!("fixtures {path:?} load failed: 
{err}"));
+        String::from_utf8(bytes).expect("fixtures content must be valid utf8")
     }
 
-    #[test]
-    fn test_snapshot_with_optional_fields() {
-        let snapshot = Snapshot::builder()
-            .version(3)
-            .id(1)
-            .schema_id(0)
-            
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
-            
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
-            .changelog_manifest_list(Some(
-                
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2".to_string(),
-            ))
-            .index_manifest(Some(
-                
"index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0".to_string(),
-            ))
-            .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
-            .commit_identifier(9223372036854775807)
-            .time_millis(1721287833568)
-            .total_record_count(Some(1))
-            .delta_record_count(Some(1))
-            .changelog_record_count(Some(0))
-            .watermark(Some(-9223372036854775808))
-            .statistics(Some("statistics_v2".to_string()))
-            .build();
-
-        assert_eq!(snapshot.version(), 3);
-        assert_eq!(snapshot.id(), 1);
-        assert_eq!(snapshot.schema_id(), 0);
-        assert_eq!(
-            snapshot.base_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
-        );
-        assert_eq!(
-            snapshot.delta_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
-        );
-        assert_eq!(
-            snapshot.changelog_manifest_list(),
-            Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
-        );
-        assert_eq!(
-            snapshot.index_manifest(),
-            Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
-        );
-        assert_eq!(
-            snapshot.commit_user(),
-            "cf568e07-05ad-4943-b4bd-37461bc58729"
-        );
-        assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
-        assert_eq!(snapshot.time_millis(), 1721287833568);
-        assert_eq!(
-            snapshot.changelog_manifest_list(),
-            Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
-        );
-        assert_eq!(snapshot.total_record_count(), Some(1));
-        assert_eq!(snapshot.delta_record_count(), Some(1));
-        assert_eq!(snapshot.changelog_record_count(), Some(0));
-        assert_eq!(snapshot.watermark(), Some(-9223372036854775808));
-        assert_eq!(snapshot.statistics(), Some("statistics_v2"));
-    }
-
-    #[test]
-    fn test_snapshot_with_none_fields() {
-        let snapshot = Snapshot::builder()
-            .version(3)
-            .id(1)
-            .schema_id(0)
-            
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
-            
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
-            .changelog_manifest_list(None)
-            .index_manifest(None)
-            .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
-            .commit_identifier(9223372036854775807)
-            .time_millis(1721287833568)
-            .total_record_count(None)
-            .delta_record_count(None)
-            .changelog_record_count(None)
-            .watermark(None)
-            .statistics(None)
-            .build();
-
-        assert_eq!(snapshot.version(), 3);
-        assert_eq!(snapshot.id(), 1);
-        assert_eq!(snapshot.schema_id(), 0);
-        assert_eq!(
-            snapshot.base_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
-        );
-        assert_eq!(
-            snapshot.delta_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
-        );
-        assert_eq!(
-            snapshot.commit_user(),
-            "cf568e07-05ad-4943-b4bd-37461bc58729"
-        );
-        assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
-        assert_eq!(snapshot.time_millis(), 1721287833568);
-        assert!(snapshot.changelog_manifest_list().is_none());
-        assert!(snapshot.index_manifest().is_none());
-        assert!(snapshot.log_offsets().is_none());
-        assert!(snapshot.total_record_count().is_none());
-        assert!(snapshot.delta_record_count().is_none());
-        assert!(snapshot.changelog_record_count().is_none());
-        assert!(snapshot.watermark().is_none());
-        assert!(snapshot.statistics().is_none());
+    fn test_cases() -> Vec<(&'static str, Snapshot)> {
+        vec![
+            (
+                "snapshot-v3",
+                Snapshot::builder()
+                    .version(3)
+                    .id(2)
+                    .schema_id(0)
+                    .base_manifest_list(
+                        
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
+                    )
+                    .delta_manifest_list(
+                        
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
+                    )
+                    
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
+                    .commit_identifier(9223372036854775807)
+                    .changelog_manifest_list(Some(
+                        
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2".to_string(),
+                    ))
+                    .commit_kind(CommitKind::APPEND)
+                    .time_millis(1724509030368)
+                    .log_offsets(Some(HashMap::default()))
+                    .total_record_count(Some(4))
+                    .delta_record_count(Some(2))
+                    .changelog_record_count(Some(2))
+                    .statistics(Some("statistics_string".to_string()))
+                    .build(),
+            ),
+            (
+                "snapshot-v3-none-field",
+                Snapshot::builder()
+                    .version(3)
+                    .id(2)
+                    .schema_id(0)
+                    .base_manifest_list(
+                        
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
+                    )
+                    .delta_manifest_list(
+                        
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
+                    )
+                    
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
+                    .commit_identifier(9223372036854775807)
+                    .changelog_manifest_list(None)
+                    .commit_kind(CommitKind::APPEND)
+                    .time_millis(1724509030368)
+                    .log_offsets(Some(HashMap::default()))
+                    .total_record_count(Some(4))
+                    .delta_record_count(Some(2))
+                    .changelog_record_count(Some(2))
+                    .build(),
+            ),
+        ]
     }
 
     #[test]
     fn test_snapshot_serialization_deserialization() {
-        let data = r#"
-    {
-      "version" : 3,
-      "id" : 1,
-      "schemaId" : 0,
-      "baseManifestList" : 
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0",
-      "deltaManifestList" : 
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1",
-      "changelogManifestList" : null,
-      "indexManifest" : 
"index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0",
-      "commitUser" : "cf568e07-05ad-4943-b4bd-37461bc58729",
-      "commitIdentifier" : 9223372036854775807,
-      "timeMillis" : 1721287833568,
-      "logOffsets" : { },
-      "totalRecordCount" : 1,
-      "deltaRecordCount" : 1,
-      "changelogRecordCount" : 0,
-      "watermark" : -9223372036854775808
-    }
-    "#;
-
-        let snapshot: Snapshot =
-            serde_json::from_str(data).expect("Failed to deserialize 
Snapshot");
-
-        assert_eq!(snapshot.version(), 3);
-        assert_eq!(snapshot.id(), 1);
-        assert_eq!(snapshot.schema_id(), 0);
-        assert_eq!(
-            snapshot.base_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
-        );
-        assert_eq!(
-            snapshot.delta_manifest_list(),
-            "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
-        );
-        assert_eq!(snapshot.changelog_manifest_list(), None);
-        assert_eq!(
-            snapshot.index_manifest(),
-            Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
-        );
-        assert_eq!(
-            snapshot.commit_user(),
-            "cf568e07-05ad-4943-b4bd-37461bc58729"
-        );
-        assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
-        assert_eq!(snapshot.time_millis(), 1721287833568);
-        assert!(snapshot.changelog_manifest_list().is_none());
-        assert_eq!(snapshot.total_record_count(), Some(1));
-        assert_eq!(snapshot.delta_record_count(), Some(1));
-        assert_eq!(snapshot.changelog_record_count(), Some(0));
-        assert_eq!(snapshot.watermark(), Some(-9223372036854775808));
-
-        let serialized = serde_json::to_string(&snapshot).expect("Failed to 
serialize Snapshot");
+        for (name, expect) in test_cases() {
+            let content = load_fixture(name);
+            let snapshot: Snapshot =
+                serde_json::from_str(content.as_str()).expect("Failed to 
deserialize Snapshot");
+            assert_eq!(snapshot, expect);
+            let serialized =
+                serde_json::to_string(&snapshot).expect("Failed to serialize 
Snapshot");
 
-        let deserialized: Snapshot =
-            serde_json::from_str(&serialized).expect("Failed to deserialize 
serialized Snapshot");
+            let deserialized: Snapshot = serde_json::from_str(&serialized)
+                .expect("Failed to deserialize serialized Snapshot");
 
-        assert_eq!(snapshot, deserialized);
+            assert_eq!(snapshot, deserialized);
+        }
     }
 }
diff --git a/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json 
b/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json
new file mode 100644
index 0000000..c853167
--- /dev/null
+++ b/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json
@@ -0,0 +1,16 @@
+{
+  "version": 3,
+  "id": 2,
+  "schemaId": 0,
+  "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0",
+  "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1",
+  "changelogManifestList": null,
+  "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a",
+  "commitIdentifier": 9223372036854775807,
+  "commitKind": "APPEND",
+  "timeMillis": 1724509030368,
+  "logOffsets": {},
+  "totalRecordCount": 4,
+  "deltaRecordCount": 2,
+  "changelogRecordCount": 2
+}
\ No newline at end of file
diff --git a/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json 
b/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json
new file mode 100644
index 0000000..894c984
--- /dev/null
+++ b/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json
@@ -0,0 +1,17 @@
+{
+  "version": 3,
+  "id": 2,
+  "schemaId": 0,
+  "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0",
+  "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1",
+  "changelogManifestList": 
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2",
+  "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a",
+  "commitIdentifier": 9223372036854775807,
+  "commitKind": "APPEND",
+  "timeMillis": 1724509030368,
+  "logOffsets": {},
+  "totalRecordCount": 4,
+  "deltaRecordCount": 2,
+  "changelogRecordCount": 2,
+  "statistics": "statistics_string"
+}
\ No newline at end of file

Reply via email to