This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5a5220d feat(spec): Add CommitKind in snapshot (#64)
5a5220d is described below
commit 5a5220d2737630f1f43a99a23f26ff97119b2894
Author: WenjunMin <[email protected]>
AuthorDate: Sun Sep 1 17:02:09 2024 +0800
feat(spec): Add CommitKind in snapshot (#64)
---
crates/paimon/src/spec/snapshot.rs | 293 +++++++--------------
.../fixtures/snapshot/snapshot-v3-none-field.json | 16 ++
.../tests/fixtures/snapshot/snapshot-v3.json | 17 ++
3 files changed, 128 insertions(+), 198 deletions(-)
diff --git a/crates/paimon/src/spec/snapshot.rs
b/crates/paimon/src/spec/snapshot.rs
index 44ffed2..28dc92d 100644
--- a/crates/paimon/src/spec/snapshot.rs
+++ b/crates/paimon/src/spec/snapshot.rs
@@ -19,6 +19,22 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use typed_builder::TypedBuilder;
+/// Type of changes in this snapshot.
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+pub enum CommitKind {
+ /// Changes flushed from the mem table.
+ APPEND,
+
+ /// Changes by compacting existing data files.
+ COMPACT,
+
+ /// Changes that clear up the whole partition and then add new records.
+ OVERWRITE,
+
+ /// Collect statistics.
+ ANALYZE,
+}
+
/// Snapshot for paimon.
///
/// Impl Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/Snapshot.java#L68>.
@@ -35,9 +51,11 @@ pub struct Snapshot {
delta_manifest_list: String,
/// a manifest list recording all changelog produced in this snapshot
#[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
changelog_manifest_list: Option<String>,
/// a manifest recording all index files of this table
#[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
index_manifest: Option<String>,
/// user who committed this snapshot
commit_user: String,
@@ -49,10 +67,12 @@ pub struct Snapshot {
/// If snapshot A has a smaller commitIdentifier than snapshot B, then
snapshot A must be
/// committed before snapshot B, and thus snapshot A must contain older
records than snapshot B.
commit_identifier: i64,
+ commit_kind: CommitKind,
/// timestamp of this snapshot
time_millis: u64,
/// log offsets of all changes occurred in this snapshot
#[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
log_offsets: Option<HashMap<i32, i64>>,
/// record count of all changes occurred in this snapshot
#[builder(default = None)]
@@ -62,12 +82,15 @@ pub struct Snapshot {
delta_record_count: Option<i64>,
/// record count of all changelog produced in this snapshot
#[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
changelog_record_count: Option<i64>,
/// watermark for input records
#[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
watermark: Option<i64>,
/// stats file name for statistics of this table
#[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
statistics: Option<String>,
}
@@ -172,213 +195,87 @@ impl Snapshot {
#[cfg(test)]
mod tests {
use super::*;
+ use pretty_assertions::assert_eq;
use serde_json;
+ use std::env::current_dir;
- #[test]
- fn test_snapshot_creation() {
- let snapshot = Snapshot::builder()
- .version(3)
- .id(1)
- .schema_id(0)
-
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
-
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
- .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
- .commit_identifier(9223372036854775807)
- .time_millis(1721287833568)
- .build();
-
- assert_eq!(snapshot.version(), 3);
- assert_eq!(snapshot.id(), 1);
- assert_eq!(snapshot.schema_id(), 0);
- assert_eq!(
- snapshot.base_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
- );
- assert_eq!(
- snapshot.delta_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
- );
- assert_eq!(
- snapshot.commit_user(),
- "cf568e07-05ad-4943-b4bd-37461bc58729"
- );
- assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
- assert_eq!(snapshot.time_millis(), 1721287833568);
- assert!(snapshot.changelog_manifest_list().is_none());
- assert!(snapshot.index_manifest().is_none());
- assert!(snapshot.log_offsets().is_none());
- assert!(snapshot.total_record_count().is_none());
- assert!(snapshot.delta_record_count().is_none());
- assert!(snapshot.changelog_record_count().is_none());
- assert!(snapshot.watermark().is_none());
- assert!(snapshot.statistics().is_none());
+ fn load_fixture(name: &str) -> String {
+ let path = current_dir()
+ .unwrap_or_else(|err| panic!("current_dir must exist: {err}"))
+ .join(format!("tests/fixtures/snapshot/{name}.json"));
+ let bytes = std::fs::read(&path)
+ .unwrap_or_else(|err| panic!("fixtures {path:?} load failed:
{err}"));
+ String::from_utf8(bytes).expect("fixtures content must be valid utf8")
}
- #[test]
- fn test_snapshot_with_optional_fields() {
- let snapshot = Snapshot::builder()
- .version(3)
- .id(1)
- .schema_id(0)
-
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
-
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
- .changelog_manifest_list(Some(
-
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2".to_string(),
- ))
- .index_manifest(Some(
-
"index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0".to_string(),
- ))
- .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
- .commit_identifier(9223372036854775807)
- .time_millis(1721287833568)
- .total_record_count(Some(1))
- .delta_record_count(Some(1))
- .changelog_record_count(Some(0))
- .watermark(Some(-9223372036854775808))
- .statistics(Some("statistics_v2".to_string()))
- .build();
-
- assert_eq!(snapshot.version(), 3);
- assert_eq!(snapshot.id(), 1);
- assert_eq!(snapshot.schema_id(), 0);
- assert_eq!(
- snapshot.base_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
- );
- assert_eq!(
- snapshot.delta_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
- );
- assert_eq!(
- snapshot.changelog_manifest_list(),
- Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
- );
- assert_eq!(
- snapshot.index_manifest(),
- Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
- );
- assert_eq!(
- snapshot.commit_user(),
- "cf568e07-05ad-4943-b4bd-37461bc58729"
- );
- assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
- assert_eq!(snapshot.time_millis(), 1721287833568);
- assert_eq!(
- snapshot.changelog_manifest_list(),
- Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2")
- );
- assert_eq!(snapshot.total_record_count(), Some(1));
- assert_eq!(snapshot.delta_record_count(), Some(1));
- assert_eq!(snapshot.changelog_record_count(), Some(0));
- assert_eq!(snapshot.watermark(), Some(-9223372036854775808));
- assert_eq!(snapshot.statistics(), Some("statistics_v2"));
- }
-
- #[test]
- fn test_snapshot_with_none_fields() {
- let snapshot = Snapshot::builder()
- .version(3)
- .id(1)
- .schema_id(0)
-
.base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string())
-
.delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string())
- .changelog_manifest_list(None)
- .index_manifest(None)
- .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string())
- .commit_identifier(9223372036854775807)
- .time_millis(1721287833568)
- .total_record_count(None)
- .delta_record_count(None)
- .changelog_record_count(None)
- .watermark(None)
- .statistics(None)
- .build();
-
- assert_eq!(snapshot.version(), 3);
- assert_eq!(snapshot.id(), 1);
- assert_eq!(snapshot.schema_id(), 0);
- assert_eq!(
- snapshot.base_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
- );
- assert_eq!(
- snapshot.delta_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
- );
- assert_eq!(
- snapshot.commit_user(),
- "cf568e07-05ad-4943-b4bd-37461bc58729"
- );
- assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
- assert_eq!(snapshot.time_millis(), 1721287833568);
- assert!(snapshot.changelog_manifest_list().is_none());
- assert!(snapshot.index_manifest().is_none());
- assert!(snapshot.log_offsets().is_none());
- assert!(snapshot.total_record_count().is_none());
- assert!(snapshot.delta_record_count().is_none());
- assert!(snapshot.changelog_record_count().is_none());
- assert!(snapshot.watermark().is_none());
- assert!(snapshot.statistics().is_none());
+ fn test_cases() -> Vec<(&'static str, Snapshot)> {
+ vec![
+ (
+ "snapshot-v3",
+ Snapshot::builder()
+ .version(3)
+ .id(2)
+ .schema_id(0)
+ .base_manifest_list(
+
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
+ )
+ .delta_manifest_list(
+
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
+ )
+
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
+ .commit_identifier(9223372036854775807)
+ .changelog_manifest_list(Some(
+
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2".to_string(),
+ ))
+ .commit_kind(CommitKind::APPEND)
+ .time_millis(1724509030368)
+ .log_offsets(Some(HashMap::default()))
+ .total_record_count(Some(4))
+ .delta_record_count(Some(2))
+ .changelog_record_count(Some(2))
+ .statistics(Some("statistics_string".to_string()))
+ .build(),
+ ),
+ (
+ "snapshot-v3-none-field",
+ Snapshot::builder()
+ .version(3)
+ .id(2)
+ .schema_id(0)
+ .base_manifest_list(
+
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0".to_string(),
+ )
+ .delta_manifest_list(
+
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1".to_string(),
+ )
+
.commit_user("abbaac9e-4a17-43e3-b135-2269da263e3a".to_string())
+ .commit_identifier(9223372036854775807)
+ .changelog_manifest_list(None)
+ .commit_kind(CommitKind::APPEND)
+ .time_millis(1724509030368)
+ .log_offsets(Some(HashMap::default()))
+ .total_record_count(Some(4))
+ .delta_record_count(Some(2))
+ .changelog_record_count(Some(2))
+ .build(),
+ ),
+ ]
}
#[test]
fn test_snapshot_serialization_deserialization() {
- let data = r#"
- {
- "version" : 3,
- "id" : 1,
- "schemaId" : 0,
- "baseManifestList" :
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0",
- "deltaManifestList" :
"manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1",
- "changelogManifestList" : null,
- "indexManifest" :
"index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0",
- "commitUser" : "cf568e07-05ad-4943-b4bd-37461bc58729",
- "commitIdentifier" : 9223372036854775807,
- "timeMillis" : 1721287833568,
- "logOffsets" : { },
- "totalRecordCount" : 1,
- "deltaRecordCount" : 1,
- "changelogRecordCount" : 0,
- "watermark" : -9223372036854775808
- }
- "#;
-
- let snapshot: Snapshot =
- serde_json::from_str(data).expect("Failed to deserialize
Snapshot");
-
- assert_eq!(snapshot.version(), 3);
- assert_eq!(snapshot.id(), 1);
- assert_eq!(snapshot.schema_id(), 0);
- assert_eq!(
- snapshot.base_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0"
- );
- assert_eq!(
- snapshot.delta_manifest_list(),
- "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1"
- );
- assert_eq!(snapshot.changelog_manifest_list(), None);
- assert_eq!(
- snapshot.index_manifest(),
- Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0")
- );
- assert_eq!(
- snapshot.commit_user(),
- "cf568e07-05ad-4943-b4bd-37461bc58729"
- );
- assert_eq!(snapshot.commit_identifier(), 9223372036854775807);
- assert_eq!(snapshot.time_millis(), 1721287833568);
- assert!(snapshot.changelog_manifest_list().is_none());
- assert_eq!(snapshot.total_record_count(), Some(1));
- assert_eq!(snapshot.delta_record_count(), Some(1));
- assert_eq!(snapshot.changelog_record_count(), Some(0));
- assert_eq!(snapshot.watermark(), Some(-9223372036854775808));
-
- let serialized = serde_json::to_string(&snapshot).expect("Failed to
serialize Snapshot");
+ for (name, expect) in test_cases() {
+ let content = load_fixture(name);
+ let snapshot: Snapshot =
+ serde_json::from_str(content.as_str()).expect("Failed to
deserialize Snapshot");
+ assert_eq!(snapshot, expect);
+ let serialized =
+ serde_json::to_string(&snapshot).expect("Failed to serialize
Snapshot");
- let deserialized: Snapshot =
- serde_json::from_str(&serialized).expect("Failed to deserialize
serialized Snapshot");
+ let deserialized: Snapshot = serde_json::from_str(&serialized)
+ .expect("Failed to deserialize serialized Snapshot");
- assert_eq!(snapshot, deserialized);
+ assert_eq!(snapshot, deserialized);
+ }
}
}
diff --git a/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json
b/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json
new file mode 100644
index 0000000..c853167
--- /dev/null
+++ b/crates/paimon/tests/fixtures/snapshot/snapshot-v3-none-field.json
@@ -0,0 +1,16 @@
+{
+ "version": 3,
+ "id": 2,
+ "schemaId": 0,
+ "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0",
+ "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1",
+ "changelogManifestList": null,
+ "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a",
+ "commitIdentifier": 9223372036854775807,
+ "commitKind": "APPEND",
+ "timeMillis": 1724509030368,
+ "logOffsets": {},
+ "totalRecordCount": 4,
+ "deltaRecordCount": 2,
+ "changelogRecordCount": 2
+}
\ No newline at end of file
diff --git a/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json
b/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json
new file mode 100644
index 0000000..894c984
--- /dev/null
+++ b/crates/paimon/tests/fixtures/snapshot/snapshot-v3.json
@@ -0,0 +1,17 @@
+{
+ "version": 3,
+ "id": 2,
+ "schemaId": 0,
+ "baseManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-0",
+ "deltaManifestList": "manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-1",
+ "changelogManifestList":
"manifest-list-ea4b892d-edc8-4ee7-9eee-7068b83a947b-2",
+ "commitUser": "abbaac9e-4a17-43e3-b135-2269da263e3a",
+ "commitIdentifier": 9223372036854775807,
+ "commitKind": "APPEND",
+ "timeMillis": 1724509030368,
+ "logOffsets": {},
+ "totalRecordCount": 4,
+ "deltaRecordCount": 2,
+ "changelogRecordCount": 2,
+ "statistics": "statistics_string"
+}
\ No newline at end of file