This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e2ba1a4 refactor: remove support of manifest list format as a list of
file path (#201)
e2ba1a4 is described below
commit e2ba1a457dd46987c81a48016e7e9796bf613b14
Author: Yue Deng <[email protected]>
AuthorDate: Mon Feb 19 17:27:54 2024 +0800
refactor: remove support of manifest list format as a list of file path
(#201)
* refactor: remove support of manifest list format as a list of file
paths#158
* refactor: add field definition to manifest list
* refactor: delete duplicated function
* refactor: fix duplicate function name
---
crates/catalog/rest/src/catalog.rs | 3 +-
crates/iceberg/src/catalog/mod.rs | 3 +-
crates/iceberg/src/scan.rs | 2 +-
crates/iceberg/src/spec/snapshot.rs | 131 ++++++++++--------------------
crates/iceberg/src/spec/table_metadata.rs | 16 ++--
5 files changed, 54 insertions(+), 101 deletions(-)
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 7ccd108..5224a85 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -718,7 +718,6 @@ mod _serde {
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
- use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform,
Type,
@@ -1146,7 +1145,7 @@ mod tests {
assert_eq!(vec![&Arc::new(Snapshot::builder()
.with_snapshot_id(3497810964824022504)
.with_timestamp_ms(1646787054459)
-
.with_manifest_list(ManifestListFile("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro".to_string()))
+
.with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro")
.with_sequence_number(0)
.with_schema_id(0)
.with_summary(Summary {
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index b688375..93d6a84 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -429,7 +429,6 @@ pub enum TableUpdate {
#[cfg(test)]
mod tests {
- use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, Summary,
@@ -911,7 +910,7 @@ mod tests {
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
-
.with_manifest_list(ManifestListFile("s3://a/b/2.avro".to_string()))
+ .with_manifest_list("s3://a/b/2.avro")
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index a94351f..0a3b9a9 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -403,7 +403,7 @@ mod tests {
fixture
.table
.file_io()
-
.new_output(current_snapshot.manifest_list_file_path().unwrap())
+ .new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
diff --git a/crates/iceberg/src/spec/snapshot.rs
b/crates/iceberg/src/spec/snapshot.rs
index 781b757..3b4558b 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -84,7 +84,9 @@ pub struct Snapshot {
timestamp_ms: i64,
/// The location of a manifest list for this snapshot that
/// tracks manifest files with additional metadata.
- manifest_list: ManifestListLocation,
+ /// Currently we only support manifest list file, and manifest files are
not supported.
+ #[builder(setter(into))]
+ manifest_list: String,
/// A string map that summarizes the snapshot changes, including operation.
summary: Summary,
/// ID of the table’s current schema when the snapshot was created.
@@ -92,16 +94,6 @@ pub struct Snapshot {
schema_id: Option<SchemaId>,
}
-/// Type to distinguish between a path to a manifestlist file or a vector of
manifestfile locations
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
-#[serde(untagged)]
-pub enum ManifestListLocation {
- /// Location of manifestlist file
- ManifestListFile(String),
- /// Manifestfile locations
- ManifestFiles(Vec<String>),
-}
-
impl Snapshot {
/// Get the id of the snapshot
#[inline]
@@ -122,23 +114,10 @@ impl Snapshot {
}
/// Get location of manifest_list file
#[inline]
- pub fn manifest_list(&self) -> &ManifestListLocation {
+ pub fn manifest_list(&self) -> &str {
&self.manifest_list
}
- /// Return the manifest list file path.
- ///
- /// It will return an error if the manifest list is not a file but a list
of manifest file paths.
- #[inline]
- pub fn manifest_list_file_path(&self) -> Result<&str> {
- match &self.manifest_list {
- ManifestListLocation::ManifestListFile(s) => Ok(s),
- _ => Err(Error::new(
- ErrorKind::DataInvalid,
- "Manifest list is not a file but a list of manifest files.",
- )),
- }
- }
/// Get summary of the snapshot
#[inline]
pub fn summary(&self) -> &Summary {
@@ -187,31 +166,28 @@ impl Snapshot {
file_io: &FileIO,
table_metadata: &TableMetadata,
) -> Result<ManifestList> {
- match &self.manifest_list {
- ManifestListLocation::ManifestListFile(file) => {
- let mut manifest_list_content= Vec::new();
- file_io
- .new_input(file)?
- .reader().await?
- .read_to_end(&mut manifest_list_content)
- .await?;
-
- let schema = self.schema(table_metadata)?;
-
- let partition_type_provider = |partition_spec_id: i32| ->
Result<Option<StructType>> {
-
table_metadata.partition_spec_by_id(partition_spec_id).map(|partition_spec| {
- partition_spec.partition_type(&schema)
- }).transpose()
- };
-
- ManifestList::parse_with_version(&manifest_list_content,
table_metadata.format_version(),
- partition_type_provider, )
- }
- ManifestListLocation::ManifestFiles(_) => Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Loading manifests from `manifests` is currently not
supported, we only support loading from `manifest-list` file, see
https://iceberg.apache.org/spec/#snapshots for more information.",
- )),
- }
+ let mut manifest_list_content = Vec::new();
+ file_io
+ .new_input(&self.manifest_list)?
+ .reader()
+ .await?
+ .read_to_end(&mut manifest_list_content)
+ .await?;
+
+ let schema = self.schema(table_metadata)?;
+
+ let partition_type_provider = |partition_spec_id: i32| ->
Result<Option<StructType>> {
+ table_metadata
+ .partition_spec_by_id(partition_spec_id)
+ .map(|partition_spec| partition_spec.partition_type(&schema))
+ .transpose()
+ };
+
+ ManifestList::parse_with_version(
+ &manifest_list_content,
+ table_metadata.format_version(),
+ partition_type_provider,
+ )
}
pub(crate) fn log(&self) -> SnapshotLog {
@@ -232,9 +208,9 @@ pub(super) mod _serde {
use serde::{Deserialize, Serialize};
use crate::spec::SchemaId;
- use crate::{Error, ErrorKind};
+ use crate::Error;
- use super::{ManifestListLocation, Operation, Snapshot, Summary};
+ use super::{Operation, Snapshot, Summary};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
@@ -276,7 +252,7 @@ pub(super) mod _serde {
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
- manifest_list:
ManifestListLocation::ManifestListFile(v2.manifest_list),
+ manifest_list: v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
}
@@ -286,17 +262,14 @@ pub(super) mod _serde {
impl From<Snapshot> for SnapshotV2 {
fn from(v2: Snapshot) -> Self {
SnapshotV2 {
- snapshot_id: v2.snapshot_id,
- parent_snapshot_id: v2.parent_snapshot_id,
- sequence_number: v2.sequence_number,
- timestamp_ms: v2.timestamp_ms,
- manifest_list: match v2.manifest_list {
- ManifestListLocation::ManifestListFile(file) => file,
- ManifestListLocation::ManifestFiles(_) => panic!("Wrong table
format version. Can't convert a list of manifest files into a location of a
manifest file.")
- },
- summary: v2.summary,
- schema_id: v2.schema_id,
- }
+ snapshot_id: v2.snapshot_id,
+ parent_snapshot_id: v2.parent_snapshot_id,
+ sequence_number: v2.sequence_number,
+ timestamp_ms: v2.timestamp_ms,
+ manifest_list: v2.manifest_list,
+ summary: v2.summary,
+ schema_id: v2.schema_id,
+ }
}
}
@@ -310,15 +283,10 @@ pub(super) mod _serde {
sequence_number: 0,
timestamp_ms: v1.timestamp_ms,
manifest_list: match (v1.manifest_list, v1.manifests) {
- (Some(file), _) =>
ManifestListLocation::ManifestListFile(file),
- (None, Some(files)) =>
ManifestListLocation::ManifestFiles(files),
- (None, None) => {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Neither manifestlist file or manifest files are
provided.",
- ))
- }
- },
+ (Some(file), None) => file,
+ (Some(_), Some(_)) => "Invalid v1 snapshot, when manifest
list provided, manifest files should be omitted".to_string(),
+ (None, _) => "Unsupported v1 snapshot, only manifest list
is supported".to_string()
+ },
summary: v1.summary.unwrap_or(Summary {
operation: Operation::default(),
other: HashMap::new(),
@@ -330,18 +298,14 @@ pub(super) mod _serde {
impl From<Snapshot> for SnapshotV1 {
fn from(v2: Snapshot) -> Self {
- let (manifest_list, manifests) = match v2.manifest_list {
- ManifestListLocation::ManifestListFile(file) => (Some(file),
None),
- ManifestListLocation::ManifestFiles(files) => (None,
Some(files)),
- };
SnapshotV1 {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
timestamp_ms: v2.timestamp_ms,
- manifest_list,
- manifests,
+ manifest_list: Some(v2.manifest_list),
summary: Some(v2.summary),
schema_id: v2.schema_id,
+ manifests: None,
}
}
}
@@ -403,9 +367,7 @@ mod tests {
use chrono::{TimeZone, Utc};
use std::collections::HashMap;
- use crate::spec::snapshot::{
- ManifestListLocation, Operation, Snapshot, Summary, _serde::SnapshotV1,
- };
+ use crate::spec::snapshot::{Operation, Snapshot, Summary,
_serde::SnapshotV1};
#[test]
fn schema() {
@@ -437,9 +399,6 @@ mod tests {
},
*result.summary()
);
- assert_eq!(
-
ManifestListLocation::ManifestListFile("s3://b/wh/.../s1.avro".to_string()),
- *result.manifest_list()
- );
+ assert_eq!("s3://b/wh/.../s1.avro".to_string(),
*result.manifest_list());
}
}
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index 18e9ce8..9903d6e 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -839,9 +839,9 @@ mod tests {
use pretty_assertions::assert_eq;
use crate::spec::{
- table_metadata::TableMetadata, ManifestListLocation, NestedField,
NullOrder, Operation,
- PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot,
SnapshotReference,
- SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
Transform, Type,
+ table_metadata::TableMetadata, NestedField, NullOrder, Operation,
PartitionField,
+ PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention,
+ SortDirection, SortField, SortOrder, Summary, Transform, Type,
};
use super::{FormatVersion, MetadataLog, SnapshotLog};
@@ -1104,7 +1104,7 @@ mod tests {
.with_timestamp_ms(1662532818843)
.with_sequence_number(0)
.with_schema_id(0)
-
.with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()))
+
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
.with_summary(Summary { operation: Operation::Append, other:
HashMap::from_iter(vec![("spark.app.id".to_string(),
"local-1662532784305".to_string()), ("added-data-files".to_string(),
"4".to_string()), ("added-records".to_string(), "4".to_string()),
("added-files-size".to_string(), "6001".to_string())]) })
.build();
@@ -1228,9 +1228,7 @@ mod tests {
.with_snapshot_id(3051729675574597004)
.with_timestamp_ms(1515100955770)
.with_sequence_number(0)
- .with_manifest_list(ManifestListLocation::ManifestListFile(
- "s3://a/b/1.avro".to_string(),
- ))
+ .with_manifest_list("s3://a/b/1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
@@ -1243,9 +1241,7 @@ mod tests {
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_schema_id(1)
- .with_manifest_list(ManifestListLocation::ManifestListFile(
- "s3://a/b/2.avro".to_string(),
- ))
+ .with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),