This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b91a3b0  spec: Implement ManifestList read functionality (#59)
b91a3b0 is described below

commit b91a3b043b8fe6ebcd6a606b20ab2ad15e4acd9f
Author: WenjunMin <[email protected]>
AuthorDate: Sun Sep 1 17:01:49 2024 +0800

    spec: Implement ManifestList read functionality (#59)
---
 crates/paimon/Cargo.toml                           |   1 +
 crates/paimon/src/error.rs                         |  17 +++++
 crates/paimon/src/spec/manifest_file_meta.rs       |  80 +++++++++++---------
 crates/paimon/src/spec/manifest_list.rs            |  83 ++++++++++++++++++++-
 ...est-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0 | Bin 0 -> 837 bytes
 5 files changed, 143 insertions(+), 38 deletions(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 9e22e5b..e1c85fc 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -48,3 +48,4 @@ snafu = "0.8.3"
 typed-builder = "^0.19"
 opendal = { version = "0.48",features = ["services-fs"] }
 pretty_assertions = "1"
+apache-avro = { version = "0.17", features = ["snappy"] }
\ No newline at end of file
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index f42b465..9294c64 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -52,6 +52,14 @@ pub enum Error {
         display("Paimon hitting invalid config: {}", message)
     )]
     ConfigInvalid { message: String },
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting unexpected avro error {}: {:?}", message, 
source)
+    )]
+    DataUnexpected {
+        message: String,
+        source: apache_avro::Error,
+    },
 }
 
 impl From<opendal::Error> for Error {
@@ -63,3 +71,12 @@ impl From<opendal::Error> for Error {
         }
     }
 }
+
+impl From<apache_avro::Error> for Error {
+    fn from(source: apache_avro::Error) -> Self {
+        Error::DataUnexpected {
+            message: "".to_string(),
+            source,
+        }
+    }
+}
diff --git a/crates/paimon/src/spec/manifest_file_meta.rs 
b/crates/paimon/src/spec/manifest_file_meta.rs
index 4f3775d..382d579 100644
--- a/crates/paimon/src/spec/manifest_file_meta.rs
+++ b/crates/paimon/src/spec/manifest_file_meta.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use serde_bytes::Bytes;
+use serde::{Deserialize, Serialize};
 use std::fmt::{Display, Formatter};
 
 /// Metadata of a manifest file.
@@ -24,6 +23,9 @@ use std::fmt::{Display, Formatter};
 /// Impl Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java>
 #[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
 pub struct ManifestFileMeta {
+    #[serde(rename = "_VERSION")]
+    version: i32,
+
     /// manifest file name
     #[serde(rename = "_FILE_NAME")]
     file_name: String,
@@ -84,6 +86,32 @@ impl ManifestFileMeta {
     pub fn schema_id(&self) -> i64 {
         self.schema_id
     }
+
+    /// Get the version of this manifest file
+    #[inline]
+    pub fn version(&self) -> i32 {
+        self.version
+    }
+
+    #[inline]
+    pub fn new(
+        file_name: String,
+        file_size: i64,
+        num_added_files: i64,
+        num_deleted_files: i64,
+        partition_stats: BinaryTableStats,
+        schema_id: i64,
+    ) -> ManifestFileMeta {
+        Self {
+            version: 2,
+            file_name,
+            file_size,
+            num_added_files,
+            num_deleted_files,
+            partition_stats,
+            schema_id,
+        }
+    }
 }
 
 impl Display for ManifestFileMeta {
@@ -117,11 +145,7 @@ pub struct BinaryTableStats {
     max_values: Vec<u8>,
 
     /// the number of nulls of the columns
-    #[serde(
-        rename = "_NULL_COUNTS",
-        serialize_with = "serialize_null_counts",
-        deserialize_with = "deserialize_null_counts"
-    )]
+    #[serde(rename = "_NULL_COUNTS")]
     null_counts: Vec<i64>,
 }
 
@@ -143,6 +167,18 @@ impl BinaryTableStats {
     pub fn null_counts(&self) -> &Vec<i64> {
         &self.null_counts
     }
+
+    pub fn new(
+        min_values: Vec<u8>,
+        max_values: Vec<u8>,
+        null_counts: Vec<i64>,
+    ) -> BinaryTableStats {
+        Self {
+            min_values,
+            max_values,
+            null_counts,
+        }
+    }
 }
 
 impl Display for BinaryTableStats {
@@ -150,33 +186,3 @@ impl Display for BinaryTableStats {
         todo!()
     }
 }
-
-fn serialize_null_counts<S>(value: &Vec<i64>, serializer: S) -> Result<S::Ok, 
S::Error>
-where
-    S: Serializer,
-{
-    let mut bytes = Vec::new();
-    for &num in value {
-        bytes.extend_from_slice(&num.to_le_bytes());
-    }
-
-    let bytes = Bytes::new(bytes.as_slice());
-    serializer.serialize_bytes(bytes)
-}
-
-fn deserialize_null_counts<'de, D>(deserializer: D) -> Result<Vec<i64>, 
D::Error>
-where
-    D: Deserializer<'de>,
-{
-    let bytes = Deserialize::deserialize(deserializer).map(Bytes::new)?;
-
-    let size_of_i64 = std::mem::size_of::<i64>();
-    let i64_count = bytes.len() / size_of_i64;
-    let mut i64s = Vec::with_capacity(i64_count);
-    for chunk in bytes.chunks_exact(size_of_i64) {
-        i64s.push(i64::from_le_bytes(
-            chunk.try_into().expect("Chunk must be 8 bytes long"),
-        ));
-    }
-    Ok(i64s)
-}
diff --git a/crates/paimon/src/spec/manifest_list.rs 
b/crates/paimon/src/spec/manifest_list.rs
index a37e8c0..2cffd5c 100644
--- a/crates/paimon/src/spec/manifest_list.rs
+++ b/crates/paimon/src/spec/manifest_list.rs
@@ -16,15 +16,96 @@
 // under the License.
 
 use super::manifest_file_meta::ManifestFileMeta;
+use crate::io::FileIO;
+use crate::{Error, Result};
+use apache_avro::types::Value;
+use apache_avro::{from_value, Reader};
+use serde::{Deserialize, Serialize};
 
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(transparent)]
 /// This file includes several [`ManifestFileMeta`], representing all data of 
the whole table at the corresponding snapshot.
-pub struct ManifestList {}
+pub struct ManifestList {
+    entries: Vec<ManifestFileMeta>,
+}
 
 impl ManifestList {
+    pub fn entries(&self) -> &Vec<ManifestFileMeta> {
+        &self.entries
+    }
+
+    pub fn from_avro_bytes(bytes: &[u8]) -> Result<ManifestList> {
+        let reader = Reader::new(bytes).map_err(Error::from)?;
+        let records = reader
+            .collect::<std::result::Result<Vec<Value>, _>>()
+            .map_err(Error::from)?;
+        let values = Value::Array(records);
+        from_value::<ManifestList>(&values).map_err(Error::from)
+    }
+}
+
+pub struct ManifestListFactory {
+    file_io: FileIO,
+}
+
+/// The factory to read and write [`ManifestList`]
+impl ManifestListFactory {
+    pub fn new(file_io: FileIO) -> ManifestListFactory {
+        Self { file_io }
+    }
+
     /// Write several [`ManifestFileMeta`]s into a manifest list.
     ///
     /// NOTE: This method is atomic.
     pub fn write(&mut self, _metas: Vec<ManifestFileMeta>) -> &str {
         todo!()
     }
+
+    /// Read [`ManifestList`] from the manifest file.
+    pub async fn read(&self, path: &str) -> Result<ManifestList> {
+        let bs = self.file_io.new_input(path)?.read().await?;
+        // todo support other formats
+        ManifestList::from_avro_bytes(bs.as_ref())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::spec::{BinaryTableStats, ManifestFileMeta, ManifestList};
+
+    #[tokio::test]
+    async fn test_read_manifest_list() {
+        let workdir =
+            std::env::current_dir().unwrap_or_else(|err| panic!("current_dir 
must exist: {err}"));
+        let path = workdir
+            
.join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0");
+        let v = std::fs::read(path.to_str().unwrap()).unwrap();
+        let res = ManifestList::from_avro_bytes(&v).unwrap();
+        let value_bytes = vec![
+            0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 
0, 0, 0, 0, 0, 129,
+        ];
+        assert_eq!(
+            res,
+            ManifestList {
+                entries: vec![
+                    ManifestFileMeta::new(
+                        
"manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(),
+                        10,
+                        10,
+                        10,
+                        BinaryTableStats::new(value_bytes.clone(), 
value_bytes.clone(), vec![1, 2]),
+                        1
+                    ),
+                    ManifestFileMeta::new(
+                        
"manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(),
+                        11,
+                        0,
+                        10,
+                        BinaryTableStats::new(value_bytes.clone(), 
value_bytes.clone(), vec![1, 2]),
+                        2
+                    )
+                ],
+            }
+        );
+    }
 }
diff --git 
a/crates/paimon/tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0
 
b/crates/paimon/tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0
new file mode 100644
index 0000000..32e73c4
Binary files /dev/null and 
b/crates/paimon/tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0
 differ

Reply via email to