This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a8df9a1  feat: support reading MOR with rollback (#264)
a8df9a1 is described below

commit a8df9a1040098dfd2d15cecc85bf1ff4d035363f
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 01:20:44 2025 -0600

    feat: support reading MOR with rollback (#264)
    
    - Introduce `LogFileScanner` to handle reading log files.
    - Handle command log block for rollback
---
 crates/core/src/file_group/log_file/log_block.rs   |  94 +++++++++++++++++++++
 crates/core/src/file_group/log_file/mod.rs         |   1 +
 crates/core/src/file_group/log_file/reader.rs      |  89 ++++++++++++-------
 crates/core/src/file_group/log_file/scanner.rs     |  81 ++++++++++++++++++
 crates/core/src/file_group/reader.rs               |  32 +++----
 crates/core/src/table/mod.rs                       |  20 +++++
 ...f4-8fd7146af503-0_20250126040823628.log.2_1-0-1 | Bin 0 -> 105 bytes
 .../data/tables/mor/v6_nonpartitioned_rollback.sql |  94 +++++++++++++++++++++
 .../data/tables/mor/v6_nonpartitioned_rollback.zip | Bin 0 -> 32598 bytes
 crates/test/src/lib.rs                             |  24 ++++--
 10 files changed, 381 insertions(+), 54 deletions(-)

diff --git a/crates/core/src/file_group/log_file/log_block.rs 
b/crates/core/src/file_group/log_file/log_block.rs
index e1b685d..6337480 100644
--- a/crates/core/src/file_group/log_file/log_block.rs
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -129,6 +129,28 @@ impl TryFrom<[u8; 4]> for BlockMetadataKey {
     }
 }
 
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[repr(u32)]
+pub enum CommandBlock {
+    Rollback = 0,
+}
+
+impl FromStr for CommandBlock {
+    type Err = CoreError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.parse::<u32>() {
+            Ok(0) => Ok(CommandBlock::Rollback),
+            Ok(val) => Err(CoreError::LogFormatError(format!(
+                "Invalid command block type value: {val}"
+            ))),
+            Err(e) => Err(CoreError::LogFormatError(format!(
+                "Failed to parse command block type: {e}"
+            ))),
+        }
+    }
+}
+
 #[allow(dead_code)]
 #[derive(Debug, Clone)]
 pub struct LogBlock {
@@ -153,11 +175,62 @@ impl LogBlock {
                 }
                 Ok(batches)
             }
+            BlockType::Command => Ok(Vec::new()),
             _ => Err(CoreError::LogBlockError(format!(
                 "Unsupported block type: {block_type:?}"
             ))),
         }
     }
+
+    pub fn instant_time(&self) -> Result<&str> {
+        let v = self
+            .header
+            .get(&BlockMetadataKey::InstantTime)
+            .ok_or_else(|| CoreError::LogBlockError("Instant time not 
found".to_string()))?;
+        Ok(v)
+    }
+
+    pub fn target_instant_time(&self) -> Result<&str> {
+        if self.block_type != BlockType::Command {
+            return Err(CoreError::LogBlockError(
+                "Target instant time is only available for command 
blocks".to_string(),
+            ));
+        }
+        let v = self
+            .header
+            .get(&BlockMetadataKey::TargetInstantTime)
+            .ok_or_else(|| CoreError::LogBlockError("Target instant time not 
found".to_string()))?;
+        Ok(v)
+    }
+
+    pub fn schema(&self) -> Result<&str> {
+        let v = self
+            .header
+            .get(&BlockMetadataKey::Schema)
+            .ok_or_else(|| CoreError::LogBlockError("Schema not 
found".to_string()))?;
+        Ok(v)
+    }
+
+    pub fn command_block_type(&self) -> Result<CommandBlock> {
+        if self.block_type != BlockType::Command {
+            return Err(CoreError::LogBlockError(
+                "Command block type is only available for command 
blocks".to_string(),
+            ));
+        }
+        let v = self
+            .header
+            .get(&BlockMetadataKey::CommandBlockType)
+            .ok_or_else(|| {
+                CoreError::LogBlockError(
+                    "Command block type not found for command 
block".to_string(),
+                )
+            })?;
+        v.parse::<CommandBlock>()
+    }
+
+    pub fn is_rollback_block(&self) -> bool {
+        matches!(self.command_block_type(), Ok(CommandBlock::Rollback))
+    }
 }
 
 #[cfg(test)]
@@ -305,4 +378,25 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_valid_rollback_block() {
+        assert_eq!(CommandBlock::from_str("0").unwrap(), 
CommandBlock::Rollback);
+    }
+
+    #[test]
+    fn test_invalid_rollback_block() {
+        assert!(matches!(
+            CommandBlock::from_str("1"),
+            Err(CoreError::LogFormatError(msg)) if msg.contains("Invalid 
command block type value: 1")
+        ));
+        assert!(matches!(
+            CommandBlock::from_str("invalid"),
+            Err(CoreError::LogFormatError(msg)) if msg.contains("Failed to 
parse command block type")
+        ));
+        assert!(matches!(
+            CommandBlock::from_str(""),
+            Err(CoreError::LogFormatError(msg)) if msg.contains("Failed to 
parse command block type")
+        ));
+    }
 }
diff --git a/crates/core/src/file_group/log_file/mod.rs 
b/crates/core/src/file_group/log_file/mod.rs
index 2012861..d473bc2 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -25,6 +25,7 @@ use std::str::FromStr;
 mod log_block;
 mod log_format;
 pub mod reader;
+pub mod scanner;
 
 #[derive(Clone, Debug)]
 pub struct LogFile {
diff --git a/crates/core/src/file_group/log_file/reader.rs 
b/crates/core/src/file_group/log_file/reader.rs
index 07a990d..7ebe270 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -28,7 +28,6 @@ use crate::storage::reader::StorageReader;
 use crate::storage::Storage;
 use crate::timeline::selector::InstantRange;
 use crate::Result;
-use arrow_array::RecordBatch;
 use bytes::BytesMut;
 use std::collections::HashMap;
 use std::io::{self, Read, Seek};
@@ -65,7 +64,7 @@ impl LogFileReader<StorageReader> {
         })
     }
 
-    fn read_all_blocks(&mut self, instant_range: &InstantRange) -> 
Result<Vec<LogBlock>> {
+    pub fn read_all_blocks(&mut self, instant_range: &InstantRange) -> 
Result<Vec<LogBlock>> {
         let mut blocks = Vec::new();
         while let Some(block) = self.read_next_block(instant_range)? {
             if block.skipped {
@@ -75,18 +74,6 @@ impl LogFileReader<StorageReader> {
         }
         Ok(blocks)
     }
-
-    pub fn read_all_records_unmerged(
-        &mut self,
-        instant_range: &InstantRange,
-    ) -> Result<Vec<RecordBatch>> {
-        let all_blocks = self.read_all_blocks(instant_range)?;
-        let mut batches = Vec::new();
-        for block in all_blocks {
-            batches.extend_from_slice(&block.record_batches);
-        }
-        Ok(batches)
-    }
 }
 
 impl<R: Read + Seek> LogFileReader<R> {
@@ -289,44 +276,82 @@ impl<R: Read + Seek> LogFileReader<R> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::file_group::log_file::log_block::CommandBlock;
+    use crate::storage::util::parse_uri;
     use std::fs::canonicalize;
     use std::path::PathBuf;
-    use url::Url;
 
-    fn get_sample_log_file() -> (PathBuf, String) {
+    fn get_valid_log_parquet() -> (String, String) {
         let dir = PathBuf::from("tests/data/log_files/valid_log_parquet");
         (
-            canonicalize(dir).unwrap(),
+            canonicalize(dir).unwrap().to_str().unwrap().to_string(),
             
".ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387".to_string(),
         )
     }
 
-    #[tokio::test]
-    async fn test_read_sample_log_file() {
-        let (dir, file_name) = get_sample_log_file();
-        let dir_url = Url::from_directory_path(dir).unwrap();
+    fn get_valid_log_rollback() -> (String, String) {
+        let dir = PathBuf::from("tests/data/log_files/valid_log_rollback");
+        (
+            canonicalize(dir).unwrap().to_str().unwrap().to_string(),
+            
".0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1".to_string(),
+        )
+    }
+
+    async fn create_log_file_reader(
+        dir: &str,
+        file_name: &str,
+    ) -> Result<LogFileReader<StorageReader>> {
+        let dir_url = parse_uri(dir)?;
         let hudi_configs = Arc::new(HudiConfigs::empty());
-        let storage = Storage::new_with_base_url(dir_url).unwrap();
-        let mut reader = LogFileReader::new(hudi_configs, storage, &file_name)
-            .await
-            .unwrap();
+        let storage = Storage::new_with_base_url(dir_url)?;
+        LogFileReader::new(hudi_configs, storage, file_name).await
+    }
+
+    #[tokio::test]
+    async fn test_read_log_file_with_parquet_data_block() -> Result<()> {
+        let (dir, file_name) = get_valid_log_parquet();
+        let mut reader = create_log_file_reader(&dir, &file_name).await?;
         let instant_range = InstantRange::up_to("20250113230424191", "utc");
-        let blocks = reader.read_all_blocks(&instant_range).unwrap();
+        let blocks = reader.read_all_blocks(&instant_range)?;
         assert_eq!(blocks.len(), 1);
 
         let block = &blocks[0];
         assert_eq!(block.format_version, LogFormatVersion::V1);
         assert_eq!(block.block_type, BlockType::ParquetData);
         assert_eq!(block.header.len(), 2);
-        assert_eq!(
-            block.header.get(&BlockMetadataKey::InstantTime).unwrap(),
-            "20250113230424191"
-        );
-        assert!(block.header.contains_key(&BlockMetadataKey::Schema));
-        assert_eq!(block.footer.len(), 0);
+        assert_eq!(block.instant_time()?, "20250113230424191");
+        assert!(block.target_instant_time().is_err());
+        assert!(block.schema().is_ok());
+        assert!(block.command_block_type().is_err());
 
         let batches = block.record_batches.as_slice();
         assert_eq!(batches.len(), 1);
         assert_eq!(batches[0].num_rows(), 1);
+
+        assert!(block.footer.is_empty());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_read_log_file_with_rollback_block() -> Result<()> {
+        let (dir, file_name) = get_valid_log_rollback();
+        let mut reader = create_log_file_reader(&dir, &file_name).await?;
+        let instant_range = InstantRange::up_to("20250126040936578", "utc");
+        let blocks = reader.read_all_blocks(&instant_range)?;
+        assert_eq!(blocks.len(), 1);
+
+        let block = &blocks[0];
+        assert_eq!(block.format_version, LogFormatVersion::V1);
+        assert_eq!(block.block_type, BlockType::Command);
+        assert_eq!(block.header.len(), 3);
+        assert_eq!(block.instant_time()?, "20250126040936578");
+        assert_eq!(block.target_instant_time()?, "20250126040826878");
+        assert!(block.schema().is_err());
+        assert_eq!(block.command_block_type()?, CommandBlock::Rollback);
+        assert!(block.record_batches.is_empty());
+        assert!(block.footer.is_empty());
+
+        Ok(())
     }
 }
diff --git a/crates/core/src/file_group/log_file/scanner.rs 
b/crates/core/src/file_group/log_file/scanner.rs
new file mode 100644
index 0000000..d6ef227
--- /dev/null
+++ b/crates/core/src/file_group/log_file/scanner.rs
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::HudiConfigs;
+use crate::file_group::log_file::log_block::LogBlock;
+use crate::file_group::log_file::reader::LogFileReader;
+use crate::storage::Storage;
+use crate::timeline::selector::InstantRange;
+use crate::Result;
+use arrow_array::RecordBatch;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct LogFileScanner {
+    hudi_configs: Arc<HudiConfigs>,
+    storage: Arc<Storage>,
+}
+
+impl LogFileScanner {
+    pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+        }
+    }
+
+    pub async fn scan(
+        &self,
+        relative_paths: Vec<String>,
+        instant_range: &InstantRange,
+    ) -> Result<Vec<Vec<RecordBatch>>> {
+        let mut all_blocks: Vec<Vec<LogBlock>> = 
Vec::with_capacity(relative_paths.len());
+        let mut rollback_targets: HashSet<String> = HashSet::new();
+
+        // collect all blocks and rollback targets
+        for path in relative_paths {
+            let mut reader =
+                LogFileReader::new(self.hudi_configs.clone(), 
self.storage.clone(), &path).await?;
+            let blocks = reader.read_all_blocks(instant_range)?;
+
+            for block in &blocks {
+                if block.is_rollback_block() {
+                    
rollback_targets.insert(block.target_instant_time()?.to_string());
+                }
+            }
+
+            // only rollback and parquet data blocks are supported
+            // TODO: support more block types
+            // push the whole vector to avoid cloning
+            all_blocks.push(blocks);
+        }
+
+        // collect valid record batches
+        let mut record_batches: Vec<Vec<RecordBatch>> = Vec::new();
+        for blocks in all_blocks {
+            for block in blocks {
+                if !rollback_targets.contains(block.instant_time()?) {
+                    record_batches.push(block.record_batches);
+                }
+            }
+        }
+
+        Ok(record_batches)
+    }
+}
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index ce2ae1c..6b875d2 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -30,7 +30,7 @@ use arrow_schema::Schema;
 use futures::TryFutureExt;
 use std::sync::Arc;
 
-use crate::file_group::log_file::reader::LogFileReader;
+use crate::file_group::log_file::scanner::LogFileScanner;
 use crate::merge::record_merger::RecordMerger;
 use crate::timeline::selector::InstantRange;
 use arrow::compute::filter_record_batch;
@@ -129,27 +129,29 @@ impl FileGroupReader {
     ) -> Result<RecordBatch> {
         let relative_path = file_slice.base_file_relative_path()?;
         if base_file_only {
-            // TODO caller to support read optimized queries
             self.read_file_slice_by_base_file_path(&relative_path).await
         } else {
-            let base_file_records = self
+            let base_record_batch = self
                 .read_file_slice_by_base_file_path(&relative_path)
                 .await?;
-            let schema = base_file_records.schema();
-            let mut all_records = vec![base_file_records];
-
-            for log_file in &file_slice.log_files {
-                let relative_path = 
file_slice.log_file_relative_path(log_file)?;
-                let hudi_configs = self.hudi_configs.clone();
-                let storage = self.storage.clone();
-                let mut log_file_reader =
-                    LogFileReader::new(hudi_configs, storage, 
&relative_path).await?;
-                let log_file_records = 
log_file_reader.read_all_records_unmerged(&instant_range)?;
-                all_records.extend_from_slice(&log_file_records);
+            let schema = base_record_batch.schema();
+            let mut all_record_batches = vec![base_record_batch];
+
+            let log_file_paths = file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?;
+            let log_record_batches =
+                LogFileScanner::new(self.hudi_configs.clone(), 
self.storage.clone())
+                    .scan(log_file_paths, &instant_range)
+                    .await?;
+            for log_record_batch in log_record_batches {
+                all_record_batches.extend(log_record_batch);
             }
 
             let merger = RecordMerger::new(self.hudi_configs.clone());
-            merger.merge_record_batches(&schema, &all_records)
+            merger.merge_record_batches(&schema, &all_record_batches)
         }
     }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index ac58679..5154a04 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -993,6 +993,26 @@ mod tests {
             Ok(())
         }
 
+        #[tokio::test]
+        async fn test_non_partitioned_rollback() -> Result<()> {
+            let base_url = SampleTable::V6NonpartitionedRollback.url_to_mor();
+            let hudi_table = Table::new(base_url.path()).await?;
+            let records = hudi_table.read_snapshot(&[]).await?;
+            let schema = &records[0].schema();
+            let records = concat_batches(schema, &records)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", true), // this was updated to false then 
rolled back to true
+                    (2, "Bob", true),   // this was updated to true after 
rollback
+                    (3, "Carol", true),
+                ]
+            );
+            Ok(())
+        }
+
         #[tokio::test]
         async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
             for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
diff --git 
a/crates/core/tests/data/log_files/valid_log_rollback/.0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1
 
b/crates/core/tests/data/log_files/valid_log_rollback/.0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1
new file mode 100644
index 0000000..fbc4778
Binary files /dev/null and 
b/crates/core/tests/data/log_files/valid_log_rollback/.0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1
 differ
diff --git a/crates/test/data/tables/mor/v6_nonpartitioned_rollback.sql 
b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.sql
new file mode 100644
index 0000000..23ef3cf
--- /dev/null
+++ b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.sql
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_nonpartitioned_rollback (
+                                   id INT,
+                                   name STRING,
+                                   isActive BOOLEAN,
+                                   byteField BYTE,
+                                   shortField SHORT,
+                                   intField INT,
+                                   longField LONG,
+                                   floatField FLOAT,
+                                   doubleField DOUBLE,
+                                   decimalField DECIMAL(10,5),
+                                   dateField DATE,
+                                   timestampField TIMESTAMP,
+                                   binaryField BINARY,
+                                   arrayField ARRAY<STRUCT<arr_struct_f1: 
STRING, arr_struct_f2: INT>>,  -- Array of structs
+                                   mapField MAP<STRING, 
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>, 
 -- Map with struct values
+                                   structField STRUCT<
+                                       field1: STRING,
+                                   field2: INT,
+                                   child_struct: STRUCT<
+                                       child_field1: DOUBLE,
+                                   child_field2: BOOLEAN
+                                       >
+                                       >
+)
+    USING HUDI
+    location '/opt/data/external_tables/v6_nonpartitioned_rollback'
+TBLPROPERTIES (
+    type = 'mor',
+    primaryKey = 'id',
+    preCombineField = 'longField',
+    'hoodie.metadata.enable' = 'false',
+    'hoodie.table.log.file.format' = 'PARQUET',
+    'hoodie.logfile.data.block.format' = 'parquet',
+    'hoodie.datasource.write.record.merger.impls' = 
'org.apache.hudi.HoodieSparkRecordMerger',
+    'hoodie.parquet.small.file.limit' = '0'
+);
+
+INSERT INTO v6_nonpartitioned_rollback VALUES
+                                  (1, 'Alice', true, 1, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                   ARRAY(STRUCT('red', 100), STRUCT('blue', 
200), STRUCT('green', 300)),
+                                   MAP('key1', STRUCT(123.456, true), 'key2', 
STRUCT(789.012, false)),
+                                   STRUCT('Alice', 30, STRUCT(123.456, true))
+                                  ),
+                                  (2, 'Bob', false, 0, 100, 25000, 9876543210, 
2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), CAST('2023-04-02 
13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+                                   ARRAY(STRUCT('yellow', 400), 
STRUCT('purple', 500)),
+                                   MAP('key3', STRUCT(234.567, true), 'key4', 
STRUCT(567.890, false)),
+                                   STRUCT('Bob', 40, STRUCT(789.012, false))
+                                  ),
+                                  (3, 'Carol', true, 1, 200, 35000, 
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE), 
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS 
BINARY),
+                                   ARRAY(STRUCT('black', 600), STRUCT('white', 
700), STRUCT('pink', 800)),
+                                   MAP('key5', STRUCT(345.678, true), 'key6', 
STRUCT(654.321, false)),
+                                   STRUCT('Carol', 25, STRUCT(456.789, true))
+                                  );
+
+INSERT INTO v6_nonpartitioned_rollback VALUES
+                                  (1, 'Alice', false, 1, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                   ARRAY(STRUCT('red', 100), STRUCT('blue', 
200), STRUCT('green', 300)),
+                                   MAP('key1', STRUCT(123.456, true), 'key2', 
STRUCT(789.012, false)),
+                                   STRUCT('Alice', 30, STRUCT(123.456, true))
+                                  ),
+                                  (4, 'Diana', true, 1, 500, 45000, 987654321, 
4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' 
AS TIMESTAMP), CAST('new binary data' AS BINARY),
+                                   ARRAY(STRUCT('orange', 900), STRUCT('gray', 
1000)),
+                                   MAP('key7', STRUCT(456.789, true), 'key8', 
STRUCT(123.456, false)),
+                                   STRUCT('Diana', 50, STRUCT(987.654, true))
+                                  );
+
+call rollback_to_instant(table => 'v6_nonpartitioned_rollback', instant_time 
=> '20250126035006837');
+
+INSERT INTO v6_nonpartitioned_rollback VALUES
+                                           (2, 'Bob', true, 0, 100, 25000, 
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), 
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+                                            ARRAY(STRUCT('yellow', 400), 
STRUCT('purple', 500)),
+                                            MAP('key3', STRUCT(234.567, true), 
'key4', STRUCT(567.890, false)),
+                                            STRUCT('Bob', 40, STRUCT(789.012, 
false))
+                                           );
diff --git a/crates/test/data/tables/mor/v6_nonpartitioned_rollback.zip 
b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.zip
new file mode 100644
index 0000000..aa2ee74
Binary files /dev/null and 
b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.zip differ
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index e99e857..1df1a1f 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -41,6 +41,7 @@ pub enum SampleTable {
     V6ComplexkeygenHivestyle,
     V6Empty,
     V6Nonpartitioned,
+    V6NonpartitionedRollback,
     V6SimplekeygenHivestyleNoMetafields,
     V6SimplekeygenNonhivestyle,
     V6SimplekeygenNonhivestyleOverwritetable,
@@ -100,10 +101,6 @@ impl SampleTable {
         path_buf.to_str().unwrap().to_string()
     }
 
-    pub fn paths(&self) -> Vec<String> {
-        vec![self.path_to_cow(), self.path_to_mor()]
-    }
-
     pub fn url_to_cow(&self) -> Url {
         let path = self.path_to_cow();
         Url::from_file_path(path).unwrap()
@@ -128,9 +125,22 @@ mod tests {
     #[test]
     fn sample_table_zip_file_should_exist() {
         for t in SampleTable::iter() {
-            let path = t.zip_path("cow");
-            assert!(path.exists());
-            assert!(path.is_file());
+            match t {
+                SampleTable::V6TimebasedkeygenNonhivestyle => {
+                    let path = t.zip_path("cow");
+                    assert!(path.exists());
+                }
+                SampleTable::V6NonpartitionedRollback => {
+                    let path = t.zip_path("mor");
+                    assert!(path.exists());
+                }
+                _ => {
+                    let path = t.zip_path("cow");
+                    assert!(path.exists());
+                    let path = t.zip_path("mor");
+                    assert!(path.exists());
+                }
+            }
         }
     }
 }

Reply via email to