This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a8df9a1 feat: support reading MOR with rollback (#264)
a8df9a1 is described below
commit a8df9a1040098dfd2d15cecc85bf1ff4d035363f
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 01:20:44 2025 -0600
feat: support reading MOR with rollback (#264)
- Introduce `LogFileScanner` to handle reading log files.
- Handle command log block for rollback
---
crates/core/src/file_group/log_file/log_block.rs | 94 +++++++++++++++++++++
crates/core/src/file_group/log_file/mod.rs | 1 +
crates/core/src/file_group/log_file/reader.rs | 89 ++++++++++++-------
crates/core/src/file_group/log_file/scanner.rs | 81 ++++++++++++++++++
crates/core/src/file_group/reader.rs | 32 +++----
crates/core/src/table/mod.rs | 20 +++++
...f4-8fd7146af503-0_20250126040823628.log.2_1-0-1 | Bin 0 -> 105 bytes
.../data/tables/mor/v6_nonpartitioned_rollback.sql | 94 +++++++++++++++++++++
.../data/tables/mor/v6_nonpartitioned_rollback.zip | Bin 0 -> 32598 bytes
crates/test/src/lib.rs | 24 ++++--
10 files changed, 381 insertions(+), 54 deletions(-)
diff --git a/crates/core/src/file_group/log_file/log_block.rs
b/crates/core/src/file_group/log_file/log_block.rs
index e1b685d..6337480 100644
--- a/crates/core/src/file_group/log_file/log_block.rs
+++ b/crates/core/src/file_group/log_file/log_block.rs
@@ -129,6 +129,28 @@ impl TryFrom<[u8; 4]> for BlockMetadataKey {
}
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[repr(u32)]
+pub enum CommandBlock {
+ Rollback = 0,
+}
+
+impl FromStr for CommandBlock {
+ type Err = CoreError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.parse::<u32>() {
+ Ok(0) => Ok(CommandBlock::Rollback),
+ Ok(val) => Err(CoreError::LogFormatError(format!(
+ "Invalid command block type value: {val}"
+ ))),
+ Err(e) => Err(CoreError::LogFormatError(format!(
+ "Failed to parse command block type: {e}"
+ ))),
+ }
+ }
+}
+
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct LogBlock {
@@ -153,11 +175,62 @@ impl LogBlock {
}
Ok(batches)
}
+ BlockType::Command => Ok(Vec::new()),
_ => Err(CoreError::LogBlockError(format!(
"Unsupported block type: {block_type:?}"
))),
}
}
+
+ pub fn instant_time(&self) -> Result<&str> {
+ let v = self
+ .header
+ .get(&BlockMetadataKey::InstantTime)
+ .ok_or_else(|| CoreError::LogBlockError("Instant time not
found".to_string()))?;
+ Ok(v)
+ }
+
+ pub fn target_instant_time(&self) -> Result<&str> {
+ if self.block_type != BlockType::Command {
+ return Err(CoreError::LogBlockError(
+ "Target instant time is only available for command
blocks".to_string(),
+ ));
+ }
+ let v = self
+ .header
+ .get(&BlockMetadataKey::TargetInstantTime)
+ .ok_or_else(|| CoreError::LogBlockError("Target instant time not
found".to_string()))?;
+ Ok(v)
+ }
+
+ pub fn schema(&self) -> Result<&str> {
+ let v = self
+ .header
+ .get(&BlockMetadataKey::Schema)
+ .ok_or_else(|| CoreError::LogBlockError("Schema not
found".to_string()))?;
+ Ok(v)
+ }
+
+ pub fn command_block_type(&self) -> Result<CommandBlock> {
+ if self.block_type != BlockType::Command {
+ return Err(CoreError::LogBlockError(
+ "Command block type is only available for command
blocks".to_string(),
+ ));
+ }
+ let v = self
+ .header
+ .get(&BlockMetadataKey::CommandBlockType)
+ .ok_or_else(|| {
+ CoreError::LogBlockError(
+ "Command block type not found for command
block".to_string(),
+ )
+ })?;
+ v.parse::<CommandBlock>()
+ }
+
+ pub fn is_rollback_block(&self) -> bool {
+ matches!(self.command_block_type(), Ok(CommandBlock::Rollback))
+ }
}
#[cfg(test)]
@@ -305,4 +378,25 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_valid_rollback_block() {
+ assert_eq!(CommandBlock::from_str("0").unwrap(),
CommandBlock::Rollback);
+ }
+
+ #[test]
+ fn test_invalid_rollback_block() {
+ assert!(matches!(
+ CommandBlock::from_str("1"),
+ Err(CoreError::LogFormatError(msg)) if msg.contains("Invalid
command block type value: 1")
+ ));
+ assert!(matches!(
+ CommandBlock::from_str("invalid"),
+ Err(CoreError::LogFormatError(msg)) if msg.contains("Failed to
parse command block type")
+ ));
+ assert!(matches!(
+ CommandBlock::from_str(""),
+ Err(CoreError::LogFormatError(msg)) if msg.contains("Failed to
parse command block type")
+ ));
+ }
}
diff --git a/crates/core/src/file_group/log_file/mod.rs
b/crates/core/src/file_group/log_file/mod.rs
index 2012861..d473bc2 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -25,6 +25,7 @@ use std::str::FromStr;
mod log_block;
mod log_format;
pub mod reader;
+pub mod scanner;
#[derive(Clone, Debug)]
pub struct LogFile {
diff --git a/crates/core/src/file_group/log_file/reader.rs
b/crates/core/src/file_group/log_file/reader.rs
index 07a990d..7ebe270 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -28,7 +28,6 @@ use crate::storage::reader::StorageReader;
use crate::storage::Storage;
use crate::timeline::selector::InstantRange;
use crate::Result;
-use arrow_array::RecordBatch;
use bytes::BytesMut;
use std::collections::HashMap;
use std::io::{self, Read, Seek};
@@ -65,7 +64,7 @@ impl LogFileReader<StorageReader> {
})
}
- fn read_all_blocks(&mut self, instant_range: &InstantRange) ->
Result<Vec<LogBlock>> {
+ pub fn read_all_blocks(&mut self, instant_range: &InstantRange) ->
Result<Vec<LogBlock>> {
let mut blocks = Vec::new();
while let Some(block) = self.read_next_block(instant_range)? {
if block.skipped {
@@ -75,18 +74,6 @@ impl LogFileReader<StorageReader> {
}
Ok(blocks)
}
-
- pub fn read_all_records_unmerged(
- &mut self,
- instant_range: &InstantRange,
- ) -> Result<Vec<RecordBatch>> {
- let all_blocks = self.read_all_blocks(instant_range)?;
- let mut batches = Vec::new();
- for block in all_blocks {
- batches.extend_from_slice(&block.record_batches);
- }
- Ok(batches)
- }
}
impl<R: Read + Seek> LogFileReader<R> {
@@ -289,44 +276,82 @@ impl<R: Read + Seek> LogFileReader<R> {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::file_group::log_file::log_block::CommandBlock;
+ use crate::storage::util::parse_uri;
use std::fs::canonicalize;
use std::path::PathBuf;
- use url::Url;
- fn get_sample_log_file() -> (PathBuf, String) {
+ fn get_valid_log_parquet() -> (String, String) {
let dir = PathBuf::from("tests/data/log_files/valid_log_parquet");
(
- canonicalize(dir).unwrap(),
+ canonicalize(dir).unwrap().to_str().unwrap().to_string(),
".ee2ace10-7667-40f5-9848-0a144b5ea064-0_20250113230302428.log.1_0-188-387".to_string(),
)
}
- #[tokio::test]
- async fn test_read_sample_log_file() {
- let (dir, file_name) = get_sample_log_file();
- let dir_url = Url::from_directory_path(dir).unwrap();
+ fn get_valid_log_rollback() -> (String, String) {
+ let dir = PathBuf::from("tests/data/log_files/valid_log_rollback");
+ (
+ canonicalize(dir).unwrap().to_str().unwrap().to_string(),
+
".0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1".to_string(),
+ )
+ }
+
+ async fn create_log_file_reader(
+ dir: &str,
+ file_name: &str,
+ ) -> Result<LogFileReader<StorageReader>> {
+ let dir_url = parse_uri(dir)?;
let hudi_configs = Arc::new(HudiConfigs::empty());
- let storage = Storage::new_with_base_url(dir_url).unwrap();
- let mut reader = LogFileReader::new(hudi_configs, storage, &file_name)
- .await
- .unwrap();
+ let storage = Storage::new_with_base_url(dir_url)?;
+ LogFileReader::new(hudi_configs, storage, file_name).await
+ }
+
+ #[tokio::test]
+ async fn test_read_log_file_with_parquet_data_block() -> Result<()> {
+ let (dir, file_name) = get_valid_log_parquet();
+ let mut reader = create_log_file_reader(&dir, &file_name).await?;
let instant_range = InstantRange::up_to("20250113230424191", "utc");
- let blocks = reader.read_all_blocks(&instant_range).unwrap();
+ let blocks = reader.read_all_blocks(&instant_range)?;
assert_eq!(blocks.len(), 1);
let block = &blocks[0];
assert_eq!(block.format_version, LogFormatVersion::V1);
assert_eq!(block.block_type, BlockType::ParquetData);
assert_eq!(block.header.len(), 2);
- assert_eq!(
- block.header.get(&BlockMetadataKey::InstantTime).unwrap(),
- "20250113230424191"
- );
- assert!(block.header.contains_key(&BlockMetadataKey::Schema));
- assert_eq!(block.footer.len(), 0);
+ assert_eq!(block.instant_time()?, "20250113230424191");
+ assert!(block.target_instant_time().is_err());
+ assert!(block.schema().is_ok());
+ assert!(block.command_block_type().is_err());
let batches = block.record_batches.as_slice();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 1);
+
+ assert!(block.footer.is_empty());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_read_log_file_with_rollback_block() -> Result<()> {
+ let (dir, file_name) = get_valid_log_rollback();
+ let mut reader = create_log_file_reader(&dir, &file_name).await?;
+ let instant_range = InstantRange::up_to("20250126040936578", "utc");
+ let blocks = reader.read_all_blocks(&instant_range)?;
+ assert_eq!(blocks.len(), 1);
+
+ let block = &blocks[0];
+ assert_eq!(block.format_version, LogFormatVersion::V1);
+ assert_eq!(block.block_type, BlockType::Command);
+ assert_eq!(block.header.len(), 3);
+ assert_eq!(block.instant_time()?, "20250126040936578");
+ assert_eq!(block.target_instant_time()?, "20250126040826878");
+ assert!(block.schema().is_err());
+ assert_eq!(block.command_block_type()?, CommandBlock::Rollback);
+ assert!(block.record_batches.is_empty());
+ assert!(block.footer.is_empty());
+
+ Ok(())
}
}
diff --git a/crates/core/src/file_group/log_file/scanner.rs
b/crates/core/src/file_group/log_file/scanner.rs
new file mode 100644
index 0000000..d6ef227
--- /dev/null
+++ b/crates/core/src/file_group/log_file/scanner.rs
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::HudiConfigs;
+use crate::file_group::log_file::log_block::LogBlock;
+use crate::file_group::log_file::reader::LogFileReader;
+use crate::storage::Storage;
+use crate::timeline::selector::InstantRange;
+use crate::Result;
+use arrow_array::RecordBatch;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct LogFileScanner {
+ hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
+}
+
+impl LogFileScanner {
+ pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ }
+ }
+
+ pub async fn scan(
+ &self,
+ relative_paths: Vec<String>,
+ instant_range: &InstantRange,
+ ) -> Result<Vec<Vec<RecordBatch>>> {
+ let mut all_blocks: Vec<Vec<LogBlock>> =
Vec::with_capacity(relative_paths.len());
+ let mut rollback_targets: HashSet<String> = HashSet::new();
+
+ // collect all blocks and rollback targets
+ for path in relative_paths {
+ let mut reader =
+ LogFileReader::new(self.hudi_configs.clone(),
self.storage.clone(), &path).await?;
+ let blocks = reader.read_all_blocks(instant_range)?;
+
+ for block in &blocks {
+ if block.is_rollback_block() {
+
rollback_targets.insert(block.target_instant_time()?.to_string());
+ }
+ }
+
+ // only rollback and parquet data blocks are supported
+ // TODO: support more block types
+ // push the whole vector to avoid cloning
+ all_blocks.push(blocks);
+ }
+
+ // collect valid record batches
+ let mut record_batches: Vec<Vec<RecordBatch>> = Vec::new();
+ for blocks in all_blocks {
+ for block in blocks {
+ if !rollback_targets.contains(block.instant_time()?) {
+ record_batches.push(block.record_batches);
+ }
+ }
+ }
+
+ Ok(record_batches)
+ }
+}
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index ce2ae1c..6b875d2 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -30,7 +30,7 @@ use arrow_schema::Schema;
use futures::TryFutureExt;
use std::sync::Arc;
-use crate::file_group::log_file::reader::LogFileReader;
+use crate::file_group::log_file::scanner::LogFileScanner;
use crate::merge::record_merger::RecordMerger;
use crate::timeline::selector::InstantRange;
use arrow::compute::filter_record_batch;
@@ -129,27 +129,29 @@ impl FileGroupReader {
) -> Result<RecordBatch> {
let relative_path = file_slice.base_file_relative_path()?;
if base_file_only {
- // TODO caller to support read optimized queries
self.read_file_slice_by_base_file_path(&relative_path).await
} else {
- let base_file_records = self
+ let base_record_batch = self
.read_file_slice_by_base_file_path(&relative_path)
.await?;
- let schema = base_file_records.schema();
- let mut all_records = vec![base_file_records];
-
- for log_file in &file_slice.log_files {
- let relative_path =
file_slice.log_file_relative_path(log_file)?;
- let hudi_configs = self.hudi_configs.clone();
- let storage = self.storage.clone();
- let mut log_file_reader =
- LogFileReader::new(hudi_configs, storage,
&relative_path).await?;
- let log_file_records =
log_file_reader.read_all_records_unmerged(&instant_range)?;
- all_records.extend_from_slice(&log_file_records);
+ let schema = base_record_batch.schema();
+ let mut all_record_batches = vec![base_record_batch];
+
+ let log_file_paths = file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?;
+ let log_record_batches =
+ LogFileScanner::new(self.hudi_configs.clone(),
self.storage.clone())
+ .scan(log_file_paths, &instant_range)
+ .await?;
+ for log_record_batch in log_record_batches {
+ all_record_batches.extend(log_record_batch);
}
let merger = RecordMerger::new(self.hudi_configs.clone());
- merger.merge_record_batches(&schema, &all_records)
+ merger.merge_record_batches(&schema, &all_record_batches)
}
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index ac58679..5154a04 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -993,6 +993,26 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_non_partitioned_rollback() -> Result<()> {
+ let base_url = SampleTable::V6NonpartitionedRollback.url_to_mor();
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(&[]).await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", true), // this was updated to false then
rolled back to true
+ (2, "Bob", true), // this was updated to true after
rollback
+ (3, "Carol", true),
+ ]
+ );
+ Ok(())
+ }
+
#[tokio::test]
async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
diff --git
a/crates/core/tests/data/log_files/valid_log_rollback/.0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1
b/crates/core/tests/data/log_files/valid_log_rollback/.0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1
new file mode 100644
index 0000000..fbc4778
Binary files /dev/null and
b/crates/core/tests/data/log_files/valid_log_rollback/.0712b9f9-d2d5-4cae-bcf4-8fd7146af503-0_20250126040823628.log.2_1-0-1
differ
diff --git a/crates/test/data/tables/mor/v6_nonpartitioned_rollback.sql
b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.sql
new file mode 100644
index 0000000..23ef3cf
--- /dev/null
+++ b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.sql
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_nonpartitioned_rollback (
+ id INT,
+ name STRING,
+ isActive BOOLEAN,
+ byteField BYTE,
+ shortField SHORT,
+ intField INT,
+ longField LONG,
+ floatField FLOAT,
+ doubleField DOUBLE,
+ decimalField DECIMAL(10,5),
+ dateField DATE,
+ timestampField TIMESTAMP,
+ binaryField BINARY,
+ arrayField ARRAY<STRUCT<arr_struct_f1:
STRING, arr_struct_f2: INT>>, -- Array of structs
+ mapField MAP<STRING,
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>,
-- Map with struct values
+ structField STRUCT<
+ field1: STRING,
+ field2: INT,
+ child_struct: STRUCT<
+ child_field1: DOUBLE,
+ child_field2: BOOLEAN
+ >
+ >
+)
+ USING HUDI
+ location '/opt/data/external_tables/v6_nonpartitioned_rollback'
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ preCombineField = 'longField',
+ 'hoodie.metadata.enable' = 'false',
+ 'hoodie.table.log.file.format' = 'PARQUET',
+ 'hoodie.logfile.data.block.format' = 'parquet',
+ 'hoodie.datasource.write.record.merger.impls' =
'org.apache.hudi.HoodieSparkRecordMerger',
+ 'hoodie.parquet.small.file.limit' = '0'
+);
+
+INSERT INTO v6_nonpartitioned_rollback VALUES
+ (1, 'Alice', true, 1, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100), STRUCT('blue',
200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456, true), 'key2',
STRUCT(789.012, false)),
+ STRUCT('Alice', 30, STRUCT(123.456, true))
+ ),
+ (2, 'Bob', false, 0, 100, 25000, 9876543210,
2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), CAST('2023-04-02
13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+ ARRAY(STRUCT('yellow', 400),
STRUCT('purple', 500)),
+ MAP('key3', STRUCT(234.567, true), 'key4',
STRUCT(567.890, false)),
+ STRUCT('Bob', 40, STRUCT(789.012, false))
+ ),
+ (3, 'Carol', true, 1, 200, 35000,
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE),
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS
BINARY),
+ ARRAY(STRUCT('black', 600), STRUCT('white',
700), STRUCT('pink', 800)),
+ MAP('key5', STRUCT(345.678, true), 'key6',
STRUCT(654.321, false)),
+ STRUCT('Carol', 25, STRUCT(456.789, true))
+ );
+
+INSERT INTO v6_nonpartitioned_rollback VALUES
+ (1, 'Alice', false, 1, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100), STRUCT('blue',
200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456, true), 'key2',
STRUCT(789.012, false)),
+ STRUCT('Alice', 30, STRUCT(123.456, true))
+ ),
+ (4, 'Diana', true, 1, 500, 45000, 987654321,
4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00'
AS TIMESTAMP), CAST('new binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900), STRUCT('gray',
1000)),
+ MAP('key7', STRUCT(456.789, true), 'key8',
STRUCT(123.456, false)),
+ STRUCT('Diana', 50, STRUCT(987.654, true))
+ );
+
+call rollback_to_instant(table => 'v6_nonpartitioned_rollback', instant_time
=> '20250126035006837');
+
+INSERT INTO v6_nonpartitioned_rollback VALUES
+ (2, 'Bob', true, 0, 100, 25000,
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE),
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+ ARRAY(STRUCT('yellow', 400),
STRUCT('purple', 500)),
+ MAP('key3', STRUCT(234.567, true),
'key4', STRUCT(567.890, false)),
+ STRUCT('Bob', 40, STRUCT(789.012,
false))
+ );
diff --git a/crates/test/data/tables/mor/v6_nonpartitioned_rollback.zip
b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.zip
new file mode 100644
index 0000000..aa2ee74
Binary files /dev/null and
b/crates/test/data/tables/mor/v6_nonpartitioned_rollback.zip differ
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index e99e857..1df1a1f 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -41,6 +41,7 @@ pub enum SampleTable {
V6ComplexkeygenHivestyle,
V6Empty,
V6Nonpartitioned,
+ V6NonpartitionedRollback,
V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle,
V6SimplekeygenNonhivestyleOverwritetable,
@@ -100,10 +101,6 @@ impl SampleTable {
path_buf.to_str().unwrap().to_string()
}
- pub fn paths(&self) -> Vec<String> {
- vec![self.path_to_cow(), self.path_to_mor()]
- }
-
pub fn url_to_cow(&self) -> Url {
let path = self.path_to_cow();
Url::from_file_path(path).unwrap()
@@ -128,9 +125,22 @@ mod tests {
#[test]
fn sample_table_zip_file_should_exist() {
for t in SampleTable::iter() {
- let path = t.zip_path("cow");
- assert!(path.exists());
- assert!(path.is_file());
+ match t {
+ SampleTable::V6TimebasedkeygenNonhivestyle => {
+ let path = t.zip_path("cow");
+ assert!(path.exists());
+ }
+ SampleTable::V6NonpartitionedRollback => {
+ let path = t.zip_path("mor");
+ assert!(path.exists());
+ }
+ _ => {
+ let path = t.zip_path("cow");
+ assert!(path.exists());
+ let path = t.zip_path("mor");
+ assert!(path.exists());
+ }
+ }
}
}
}