This is an automated email from the ASF dual-hosted git repository. sdd pushed a commit to branch feature/table-scan-delete-file-handling in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
commit a258d4a0837df1c0940270fc71a643b0870bcc63 Author: Scott Donnelly <sc...@donnel.ly> AuthorDate: Wed Sep 25 07:57:28 2024 +0100 feat: add delete_file_index and populate in table scan --- crates/iceberg/src/arrow/reader.rs | 20 +- crates/iceberg/src/delete_file_index.rs | 110 ++++++++ crates/iceberg/src/expr/predicate.rs | 6 + crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/scan.rs | 276 +++++++++++++++++---- .../testdata/example_table_metadata_v2.json | 16 +- 6 files changed, 372 insertions(+), 58 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b4e15821..7e6df878 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -47,8 +47,12 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, PrimitiveType, Schema}; +use crate::scan::{ + ArrowRecordBatchStream, FileScanTask, FileScanTaskStream, +}; +use crate::spec::{ + Datum, PrimitiveType, Schema, +}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -712,10 +716,14 @@ impl PredicateConverter<'_> { let index = self .column_indices .iter() - .position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!( - "Leave column `{}` in predicates cannot be found in the required column indices.", - reference.field().name - )))?; + .position(|&idx| idx == *column_idx) + .ok_or(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column `{}` in predicates cannot be found in the required column indices.", + reference.field().name + ), + ))?; Ok(Some(index)) } else { diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs new file mode 100644 index 00000000..5265c08f --- /dev/null +++ b/crates/iceberg/src/delete_file_index.rs @@ -0,0 +1,110 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use futures::channel::mpsc; +use futures::{StreamExt, TryStreamExt}; +use tokio::sync::watch; + +use crate::scan::FileScanTaskDeleteFile; +use crate::spec::{DataContentType, DataFile}; +use crate::Result; + +type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>; +pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver<Option<DeleteFileIndexRef>>; + +/// Index of delete files +#[derive(Debug)] +pub(crate) struct DeleteFileIndex { + #[allow(dead_code)] + global_deletes: Vec<Arc<FileScanTaskDeleteFile>>, + #[allow(dead_code)] + equality_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>>, + #[allow(dead_code)] + positional_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>>, + positional_deletes_by_path: HashMap<String, Vec<Arc<FileScanTaskDeleteFile>>>, +} + +impl DeleteFileIndex { + pub(crate) fn from_receiver( + receiver: mpsc::Receiver<Result<FileScanTaskDeleteFile>>, + ) -> watch::Receiver<Option<DeleteFileIndexRef>> { + let (tx, rx) = watch::channel(None); + + let delete_file_stream = receiver.boxed(); + tokio::spawn(async move { + let delete_files = delete_file_stream.try_collect::<Vec<_>>().await; + let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files); + let delete_file_index = Arc::new(delete_file_index); + tx.send(Some(delete_file_index)) + }); + + rx + } + + fn from_delete_files(files: Vec<FileScanTaskDeleteFile>) -> Self { + let mut equality_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>> = + HashMap::default(); + let mut positional_deletes_by_partition: HashMap<i32, Vec<Arc<FileScanTaskDeleteFile>>> = + HashMap::default(); + let mut positional_deletes_by_path: HashMap<String, Vec<Arc<FileScanTaskDeleteFile>>> = + HashMap::default(); + + files.into_iter().for_each(|file| { + let arc_file = Arc::new(file); + match arc_file.file_type { + DataContentType::PositionDeletes => { + // TODO: implement logic from ContentFileUtil.referencedDataFile + // see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54 + let referenced_data_file_path = "TODO".to_string(); + + positional_deletes_by_path + .entry(referenced_data_file_path) + .and_modify(|entry| { + entry.push(arc_file.clone()); + }) + .or_insert(vec![arc_file.clone()]); + + positional_deletes_by_partition + .entry(arc_file.partition_spec_id) + .and_modify(|entry| { + entry.push(arc_file.clone()); + }) + .or_insert(vec![arc_file.clone()]); + } + DataContentType::EqualityDeletes => { + equality_deletes_by_partition + .entry(arc_file.partition_spec_id) + .and_modify(|entry| { + entry.push(arc_file.clone()); + }) + .or_insert(vec![arc_file.clone()]); + } + _ => unreachable!(), + } + }); + + DeleteFileIndex { + global_deletes: vec![], + equality_deletes_by_partition, + positional_deletes_by_partition, + positional_deletes_by_path, + } + } + + /// Determine all the delete files that apply to the provided `DataFile`. + pub(crate) fn get_deletes_for_data_file( + &self, + data_file: &DataFile, + ) -> Vec<FileScanTaskDeleteFile> { + let mut results = vec![]; + + if let Some(positional_deletes) = self.positional_deletes_by_path.get(data_file.file_path()) + { + results.extend(positional_deletes.iter().map(|i| i.as_ref().clone())) + } + + // TODO: equality deletes + + results + } +} diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index e0f6a784..f38a0cca 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -724,6 +724,12 @@ pub enum BoundPredicate { Set(SetExpression<BoundReference>), } +impl BoundPredicate { + pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate { + BoundPredicate::And(LogicalExpression::new([Box::new(self), Box::new(other)])) + } +} + impl Display for BoundPredicate { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index fe5a5299..466592bb 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -83,6 +83,8 @@ pub mod transform; mod runtime; pub mod arrow; +// pub(crate) mod delete_file_index; +pub(crate) mod delete_file_index; mod utils; pub mod writer; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e..acf10f5e 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -27,6 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; +use crate::delete_file_index::{DeleteFileIndex, DeleteFileIndexRefReceiver}; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::visitors::inclusive_projection::InclusiveProjection; @@ -62,6 +63,11 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + + // TODO: defaults to false for now whilst delete file processing + // is still being worked on but will switch to a default of true + // once this work is complete + delete_file_processing_enabled: bool, } impl<'a> TableScanBuilder<'a> { @@ -80,6 +86,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_processing_enabled: false, } } @@ -186,6 +193,17 @@ impl<'a> TableScanBuilder<'a> { self } + /// Determines whether to enable delete file processing (currently disabled by default) + /// + /// When disabled, delete files are ignored. + pub fn with_delete_file_processing_enabled( + mut self, + delete_file_processing_enabled: bool, + ) -> Self { + self.delete_file_processing_enabled = delete_file_processing_enabled; + self + } + /// Build the table scan. pub fn build(self) -> Result<TableScan> { let snapshot = match self.snapshot_id { @@ -304,6 +322,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_processing_enabled: self.delete_file_processing_enabled, }) } } @@ -329,6 +348,7 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_processing_enabled: bool, } /// PlanContext wraps a [`SnapshotRef`] alongside all the other @@ -357,18 +377,39 @@ impl TableScan { let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; // used to stream ManifestEntryContexts between stages of the file plan operation - let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) = + let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = + channel(concurrency_limit_manifest_files); + let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = channel(concurrency_limit_manifest_files); + // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + let delete_file_idx_and_tx: Option<( + DeleteFileIndexRefReceiver, + Sender<Result<FileScanTaskDeleteFile>>, + )> = if self.delete_file_processing_enabled { + // used to stream delete files into the DeleteFileIndex + let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); + + let delete_file_index_rx = DeleteFileIndex::from_receiver(delete_file_rx); + Some((delete_file_index_rx, delete_file_tx)) + } else { + None + }; + let manifest_list = self.plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out - // partitions cannot match the scan's filter - let manifest_file_contexts = self - .plan_context - .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; + // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any + // whose partitions cannot match this + // scan's filter + let manifest_file_contexts = self.plan_context.build_manifest_file_contexts( + manifest_list, + manifest_entry_data_ctx_tx, + delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| { + (delete_file_idx.clone(), manifest_entry_delete_ctx_tx) + }), + )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -385,17 +426,45 @@ impl TableScan { } }); - let mut channel_for_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + + if let Some((_, delete_file_tx)) = delete_file_idx_and_tx { + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the delete file [`ManifestEntry`] stream in parallel + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(manifest_entry_context, tx) + .await + }) + .await + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; + } - // Process the [`ManifestEntry`] stream in parallel + // Process the data file [`ManifestEntry`] stream in parallel spawn(async move { - let result = manifest_entry_ctx_rx + let result = manifest_entry_data_ctx_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) .try_for_each_concurrent( concurrency_limit_manifest_entries, |(manifest_entry_context, tx)| async move { spawn(async move { - Self::process_manifest_entry(manifest_entry_context, tx).await + Self::process_data_manifest_entry(manifest_entry_context, tx).await }) .await }, @@ -403,7 +472,7 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_manifest_entry_error.send(Err(error)).await; + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; } }); @@ -437,7 +506,7 @@ impl TableScan { &self.plan_context.snapshot } - async fn process_manifest_entry( + async fn process_data_manifest_entry( manifest_entry_context: ManifestEntryContext, mut file_scan_task_tx: Sender<Result<FileScanTask>>, ) -> Result<()> { @@ -446,12 +515,11 @@ impl TableScan { return Ok(()); } - // abort the plan if we encounter a manifest entry whose data file's - // content type is currently unsupported + // abort the plan if we encounter a manifest entry for a delete file if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::FeatureUnsupported, - "Only Data files currently supported", + "Encountered an entry for a delete file in a data file manifest", )); } @@ -489,7 +557,54 @@ impl TableScan { // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task())) + .send(Ok(manifest_entry_context.into_file_scan_task().await?)) + .await?; + + Ok(()) + } + + async fn process_delete_manifest_entry( + manifest_entry_context: ManifestEntryContext, + mut file_scan_task_delete_file_tx: Sender<Result<FileScanTaskDeleteFile>>, + ) -> Result<()> { + // skip processing this manifest entry if it has been marked as deleted + if !manifest_entry_context.manifest_entry.is_alive() { + return Ok(()); + } + + // abort the plan if we encounter a manifest entry that is not for a delete file + if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a data file in a delete manifest", + )); + } + + if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates { + let expression_evaluator_cache = + manifest_entry_context.expression_evaluator_cache.as_ref(); + + let expression_evaluator = expression_evaluator_cache.get( + manifest_entry_context.partition_spec_id, + &bound_predicates.partition_bound_predicate, + )?; + + // skip any data file whose partition data indicates that it can't contain + // any data that matches this scan's filter + if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { + return Ok(()); + } + } + + file_scan_task_delete_file_tx + .send(Ok(FileScanTaskDeleteFile { + file_path: manifest_entry_context + .manifest_entry + .file_path() + .to_string(), + file_type: manifest_entry_context.manifest_entry.content_type(), + partition_spec_id: manifest_entry_context.partition_spec_id, + })) .await?; Ok(()) @@ -513,6 +628,7 @@ struct ManifestFileContext { object_cache: Arc<ObjectCache>, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc<ExpressionEvaluatorCache>, + delete_file_index: Option<DeleteFileIndexRefReceiver>, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -525,6 +641,7 @@ struct ManifestEntryContext { bound_predicates: Option<Arc<BoundPredicates>>, partition_spec_id: i32, snapshot_schema: SchemaRef, + delete_file_index: Option<DeleteFileIndexRefReceiver>, } impl ManifestFileContext { @@ -539,6 +656,7 @@ impl ManifestFileContext { field_ids, mut sender, expression_evaluator_cache, + delete_file_index, .. } = self; @@ -546,13 +664,14 @@ impl ManifestFileContext { for manifest_entry in manifest.entries() { let manifest_entry_context = ManifestEntryContext { - // TODO: refactor to avoid clone + // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), expression_evaluator_cache: expression_evaluator_cache.clone(), field_ids: field_ids.clone(), partition_spec_id: manifest_file.partition_spec_id, bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), + delete_file_index: delete_file_index.clone(), }; sender @@ -568,8 +687,34 @@ impl ManifestFileContext { impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it - fn into_file_scan_task(self) -> FileScanTask { - FileScanTask { + async fn into_file_scan_task(self) -> Result<FileScanTask> { + // let deletes = self.get_deletes().await?; + + let deletes = if let Some(mut delete_file_index_rx) = self.delete_file_index { + let del_file_idx_opt = delete_file_index_rx + .wait_for(Option::is_some) + .await + .map_err(|_| Error::new(ErrorKind::Unexpected, "DeleteFileIndex recv error"))?; + + match del_file_idx_opt.as_ref() { + Some(del_file_idx) => match del_file_idx.as_ref() { + Ok(delete_file_idx) => { + delete_file_idx.get_deletes_for_data_file(self.manifest_entry.data_file()) + } + Err(err) => { + return Err(Error::new(ErrorKind::Unexpected, err.message())); + } + }, + + // the `wait_for(Option::is_some)` above means that we can + // never get a `None` here + None => unreachable!(), + } + } else { + vec![] + }; + + Ok(FileScanTask { start: 0, length: self.manifest_entry.file_size_in_bytes(), record_count: Some(self.manifest_entry.record_count()), @@ -583,7 +728,9 @@ impl ManifestEntryContext { predicate: self .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - } + + deletes, + }) } } @@ -619,29 +766,33 @@ impl PlanContext { fn build_manifest_file_contexts( &self, manifest_list: Arc<ManifestList>, - sender: Sender<ManifestEntryContext>, + tx_data: Sender<ManifestEntryContext>, + delete_file_idx_and_tx: Option<(DeleteFileIndexRefReceiver, Sender<ManifestEntryContext>)>, ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> { - let entries = manifest_list.entries(); - - if entries - .iter() - .any(|e| e.content != ManifestContentType::Data) - { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Merge-on-read is not yet supported", - )); - } + let manifest_files = manifest_list.entries().iter(); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; - if self.predicate.is_some() { - for manifest_file in entries { + + for manifest_file in manifest_files { + let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes { + let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else { + continue; + }; + (Some(delete_file_idx.clone()), tx.clone()) + } else { + ( + delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()), + tx_data.clone(), + ) + }; + + let partition_bound_predicate = if self.predicate.is_some() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip // if it cannot contain any matching rows - if self + if !self .manifest_evaluator_cache .get( manifest_file.partition_spec_id, @@ -649,19 +800,22 @@ impl PlanContext { ) .eval(manifest_file)? { - let mfc = self.create_manifest_file_context( - manifest_file, - Some(partition_bound_predicate), - sender.clone(), - ); - filtered_mfcs.push(Ok(mfc)); + continue; } - } - } else { - for manifest_file in entries { - let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); - filtered_mfcs.push(Ok(mfc)); - } + + Some(partition_bound_predicate) + } else { + None + }; + + let mfc = self.create_manifest_file_context( + manifest_file, + partition_bound_predicate, + tx, + delete_file_idx, + ); + + filtered_mfcs.push(Ok(mfc)); } Ok(Box::new(filtered_mfcs.into_iter())) @@ -672,6 +826,7 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option<Arc<BoundPredicate>>, sender: Sender<ManifestEntryContext>, + delete_file_index: Option<DeleteFileIndexRefReceiver>, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -693,6 +848,7 @@ impl PlanContext { snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), + delete_file_index, } } } @@ -919,8 +1075,10 @@ pub struct FileScanTask { /// The data file path corresponding to the task. pub data_file_path: String, + /// The content type of the file to scan. pub data_file_content: DataContentType, + /// The format of the file to scan. pub data_file_format: DataFileFormat, @@ -931,6 +1089,22 @@ pub struct FileScanTask { /// The predicate to filter. #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option<BoundPredicate>, + + /// The list of delete files that may need to be applied to this data file + pub deletes: Vec<FileScanTaskDeleteFile>, +} + +/// A task to scan part of file. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FileScanTaskDeleteFile { + /// The delete file path + pub file_path: String, + + /// delete file type + pub file_type: DataContentType, + + /// partition id + pub partition_spec_id: i32, } impl FileScanTask { @@ -1416,16 +1590,16 @@ pub mod tests { .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .await .unwrap(); - let batche1: Vec<_> = batch_stream.try_collect().await.unwrap(); + let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .await .unwrap(); - let batche2: Vec<_> = batch_stream.try_collect().await.unwrap(); + let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); - assert_eq!(batche1, batche2); + assert_eq!(batch_1, batch_2); } #[tokio::test] @@ -1867,6 +2041,7 @@ pub mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, + deletes: vec![], }; test_fn(task); @@ -1881,6 +2056,7 @@ pub mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, + deletes: vec![], }; test_fn(task); } diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json index 35230966..17bbd7d9 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2.json +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -7,7 +7,12 @@ "last-column-id": 3, "current-schema-id": 1, "schemas": [ - {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}, { "type": "struct", "schema-id": 1, @@ -25,7 +30,14 @@ } ], "default-spec-id": 0, - "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + {"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000} + ] + } + ], "last-partition-id": 1000, "default-sort-order-id": 3, "sort-orders": [