This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b91a3b0 spec: Implement ManifestList read functionality (#59)
b91a3b0 is described below
commit b91a3b043b8fe6ebcd6a606b20ab2ad15e4acd9f
Author: WenjunMin <[email protected]>
AuthorDate: Sun Sep 1 17:01:49 2024 +0800
spec: Implement ManifestList read functionality (#59)
---
crates/paimon/Cargo.toml | 1 +
crates/paimon/src/error.rs | 17 +++++
crates/paimon/src/spec/manifest_file_meta.rs | 80 +++++++++++---------
crates/paimon/src/spec/manifest_list.rs | 83 ++++++++++++++++++++-
...est-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0 | Bin 0 -> 837 bytes
5 files changed, 143 insertions(+), 38 deletions(-)
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 9e22e5b..e1c85fc 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -48,3 +48,4 @@ snafu = "0.8.3"
typed-builder = "^0.19"
opendal = { version = "0.48",features = ["services-fs"] }
pretty_assertions = "1"
+apache-avro = { version = "0.17", features = ["snappy"] }
\ No newline at end of file
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index f42b465..9294c64 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -52,6 +52,14 @@ pub enum Error {
display("Paimon hitting invalid config: {}", message)
)]
ConfigInvalid { message: String },
+ #[snafu(
+ visibility(pub(crate)),
+ display("Paimon hitting unexpected avro error {}: {:?}", message,
source)
+ )]
+ DataUnexpected {
+ message: String,
+ source: apache_avro::Error,
+ },
}
impl From<opendal::Error> for Error {
@@ -63,3 +71,12 @@ impl From<opendal::Error> for Error {
}
}
}
+
+impl From<apache_avro::Error> for Error {
+ fn from(source: apache_avro::Error) -> Self {
+ Error::DataUnexpected {
+ message: "".to_string(),
+ source,
+ }
+ }
+}
diff --git a/crates/paimon/src/spec/manifest_file_meta.rs
b/crates/paimon/src/spec/manifest_file_meta.rs
index 4f3775d..382d579 100644
--- a/crates/paimon/src/spec/manifest_file_meta.rs
+++ b/crates/paimon/src/spec/manifest_file_meta.rs
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use serde_bytes::Bytes;
+use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
/// Metadata of a manifest file.
@@ -24,6 +23,9 @@ use std::fmt::{Display, Formatter};
/// Impl Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java>
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct ManifestFileMeta {
+ #[serde(rename = "_VERSION")]
+ version: i32,
+
/// manifest file name
#[serde(rename = "_FILE_NAME")]
file_name: String,
@@ -84,6 +86,32 @@ impl ManifestFileMeta {
pub fn schema_id(&self) -> i64 {
self.schema_id
}
+
+ /// Get the version of this manifest file
+ #[inline]
+ pub fn version(&self) -> i32 {
+ self.version
+ }
+
+ #[inline]
+ pub fn new(
+ file_name: String,
+ file_size: i64,
+ num_added_files: i64,
+ num_deleted_files: i64,
+ partition_stats: BinaryTableStats,
+ schema_id: i64,
+ ) -> ManifestFileMeta {
+ Self {
+ version: 2,
+ file_name,
+ file_size,
+ num_added_files,
+ num_deleted_files,
+ partition_stats,
+ schema_id,
+ }
+ }
}
impl Display for ManifestFileMeta {
@@ -117,11 +145,7 @@ pub struct BinaryTableStats {
max_values: Vec<u8>,
/// the number of nulls of the columns
- #[serde(
- rename = "_NULL_COUNTS",
- serialize_with = "serialize_null_counts",
- deserialize_with = "deserialize_null_counts"
- )]
+ #[serde(rename = "_NULL_COUNTS")]
null_counts: Vec<i64>,
}
@@ -143,6 +167,18 @@ impl BinaryTableStats {
pub fn null_counts(&self) -> &Vec<i64> {
&self.null_counts
}
+
+ pub fn new(
+ min_values: Vec<u8>,
+ max_values: Vec<u8>,
+ null_counts: Vec<i64>,
+ ) -> BinaryTableStats {
+ Self {
+ min_values,
+ max_values,
+ null_counts,
+ }
+ }
}
impl Display for BinaryTableStats {
@@ -150,33 +186,3 @@ impl Display for BinaryTableStats {
todo!()
}
}
-
-fn serialize_null_counts<S>(value: &Vec<i64>, serializer: S) -> Result<S::Ok,
S::Error>
-where
- S: Serializer,
-{
- let mut bytes = Vec::new();
- for &num in value {
- bytes.extend_from_slice(&num.to_le_bytes());
- }
-
- let bytes = Bytes::new(bytes.as_slice());
- serializer.serialize_bytes(bytes)
-}
-
-fn deserialize_null_counts<'de, D>(deserializer: D) -> Result<Vec<i64>,
D::Error>
-where
- D: Deserializer<'de>,
-{
- let bytes = Deserialize::deserialize(deserializer).map(Bytes::new)?;
-
- let size_of_i64 = std::mem::size_of::<i64>();
- let i64_count = bytes.len() / size_of_i64;
- let mut i64s = Vec::with_capacity(i64_count);
- for chunk in bytes.chunks_exact(size_of_i64) {
- i64s.push(i64::from_le_bytes(
- chunk.try_into().expect("Chunk must be 8 bytes long"),
- ));
- }
- Ok(i64s)
-}
diff --git a/crates/paimon/src/spec/manifest_list.rs
b/crates/paimon/src/spec/manifest_list.rs
index a37e8c0..2cffd5c 100644
--- a/crates/paimon/src/spec/manifest_list.rs
+++ b/crates/paimon/src/spec/manifest_list.rs
@@ -16,15 +16,96 @@
// under the License.
use super::manifest_file_meta::ManifestFileMeta;
+use crate::io::FileIO;
+use crate::{Error, Result};
+use apache_avro::types::Value;
+use apache_avro::{from_value, Reader};
+use serde::{Deserialize, Serialize};
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(transparent)]
/// This file includes several [`ManifestFileMeta`], representing all data of
the whole table at the corresponding snapshot.
-pub struct ManifestList {}
+pub struct ManifestList {
+ entries: Vec<ManifestFileMeta>,
+}
impl ManifestList {
+ pub fn entries(&self) -> &Vec<ManifestFileMeta> {
+ &self.entries
+ }
+
+ pub fn from_avro_bytes(bytes: &[u8]) -> Result<ManifestList> {
+ let reader = Reader::new(bytes).map_err(Error::from)?;
+ let records = reader
+ .collect::<std::result::Result<Vec<Value>, _>>()
+ .map_err(Error::from)?;
+ let values = Value::Array(records);
+ from_value::<ManifestList>(&values).map_err(Error::from)
+ }
+}
+
+pub struct ManifestListFactory {
+ file_io: FileIO,
+}
+
+/// The factory to read and write [`ManifestList`]
+impl ManifestListFactory {
+ pub fn new(file_io: FileIO) -> ManifestListFactory {
+ Self { file_io }
+ }
+
/// Write several [`ManifestFileMeta`]s into a manifest list.
///
/// NOTE: This method is atomic.
pub fn write(&mut self, _metas: Vec<ManifestFileMeta>) -> &str {
todo!()
}
+
+ /// Read [`ManifestList`] from the manifest file.
+ pub async fn read(&self, path: &str) -> Result<ManifestList> {
+ let bs = self.file_io.new_input(path)?.read().await?;
+ // todo support other formats
+ ManifestList::from_avro_bytes(bs.as_ref())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::spec::{BinaryTableStats, ManifestFileMeta, ManifestList};
+
+ #[tokio::test]
+ async fn test_read_manifest_list() {
+ let workdir =
+ std::env::current_dir().unwrap_or_else(|err| panic!("current_dir
must exist: {err}"));
+ let path = workdir
+
.join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0");
+ let v = std::fs::read(path.to_str().unwrap()).unwrap();
+ let res = ManifestList::from_avro_bytes(&v).unwrap();
+ let value_bytes = vec![
+ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0,
0, 0, 0, 0, 0, 129,
+ ];
+ assert_eq!(
+ res,
+ ManifestList {
+ entries: vec![
+ ManifestFileMeta::new(
+
"manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(),
+ 10,
+ 10,
+ 10,
+ BinaryTableStats::new(value_bytes.clone(),
value_bytes.clone(), vec![1, 2]),
+ 1
+ ),
+ ManifestFileMeta::new(
+
"manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(),
+ 11,
+ 0,
+ 10,
+ BinaryTableStats::new(value_bytes.clone(),
value_bytes.clone(), vec![1, 2]),
+ 2
+ )
+ ],
+ }
+ );
+ }
}
diff --git
a/crates/paimon/tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0
b/crates/paimon/tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0
new file mode 100644
index 0000000..32e73c4
Binary files /dev/null and
b/crates/paimon/tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0
differ