This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 9920d5c fix: handle replacecommit for loading file slices (#53)
9920d5c is described below
commit 9920d5cdb79ab557d331a404f812f8387f869699
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jul 7 04:12:28 2024 -0500
fix: handle replacecommit for loading file slices (#53)
Filter out file slices that belong to replaced file groups.
Also add a full table test case
`v6_simplekeygen_nonhivestyle_overwritetable`.
---
crates/core/src/file_group/mod.rs | 16 ++++
crates/core/src/table/fs_view.rs | 25 +++++-
crates/core/src/table/mod.rs | 6 +-
crates/core/src/table/timeline.rs | 84 +++++++++++++-----
crates/datafusion/src/lib.rs | 41 ++++++++-
...v6_simplekeygen_nonhivestyle_overwritetable.sql | 96 +++++++++++++++++++++
...v6_simplekeygen_nonhivestyle_overwritetable.zip | Bin 0 -> 36037 bytes
crates/tests/src/lib.rs | 1 +
8 files changed, 241 insertions(+), 28 deletions(-)
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index c0af0b3..3dd1af3 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,6 +20,7 @@
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
+use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use anyhow::{anyhow, Result};
@@ -116,6 +117,21 @@ pub struct FileGroup {
pub file_slices: BTreeMap<String, FileSlice>,
}
+impl PartialEq for FileGroup {
+ fn eq(&self, other: &Self) -> bool {
+ self.id == other.id && self.partition_path == other.partition_path
+ }
+}
+
+impl Eq for FileGroup {}
+
+impl Hash for FileGroup {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.id.hash(state);
+ self.partition_path.hash(state);
+ }
+}
+
impl fmt::Display for FileGroup {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index bdc05fc..974bbbc 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -17,7 +17,7 @@
* under the License.
*/
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{anyhow, Result};
@@ -121,11 +121,18 @@ impl FileSystemView {
Ok(file_groups)
}
- pub fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
+ pub fn get_file_slices_as_of(
+ &self,
+ timestamp: &str,
+ excluding_file_groups: &HashSet<FileGroup>,
+ ) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.iter() {
let fgs_ref = fgs.value();
for fg in fgs_ref {
+ if excluding_file_groups.contains(fg) {
+ continue;
+ }
if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
// TODO: pass ref instead of copying
file_slices.push(fsl.clone());
@@ -135,10 +142,17 @@ impl FileSystemView {
Ok(file_slices)
}
- pub async fn load_file_slices_stats_as_of(&self, timestamp: &str) ->
Result<()> {
+ pub async fn load_file_slices_stats_as_of(
+ &self,
+ timestamp: &str,
+ exclude_file_groups: &HashSet<FileGroup>,
+ ) -> Result<()> {
for mut fgs in self.partition_to_file_groups.iter_mut() {
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
+ if exclude_file_groups.contains(fg) {
+ continue;
+ }
if let Some(file_slice) =
fg.get_file_slice_mut_as_of(timestamp) {
file_slice
.load_stats(&self.storage)
@@ -215,7 +229,10 @@ mod tests {
.await
.unwrap();
- let file_slices =
fs_view.get_file_slices_as_of("20240418173551906").unwrap();
+ let excludes = HashSet::new();
+ let file_slices = fs_view
+ .get_file_slices_as_of("20240418173551906", &excludes)
+ .unwrap();
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index bbc5673..07126f1 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -182,11 +182,13 @@ impl Table {
}
async fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
+ let excludes = self.timeline.get_replaced_file_groups().await?;
self.file_system_view
- .load_file_slices_stats_as_of(timestamp)
+ .load_file_slices_stats_as_of(timestamp, &excludes)
.await
.context("Fail to load file slice stats.")?;
- self.file_system_view.get_file_slices_as_of(timestamp)
+ self.file_system_view
+ .get_file_slices_as_of(timestamp, &excludes)
}
pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index 9010738..ae92bac 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -17,8 +17,8 @@
* under the License.
*/
-use std::cmp::Ordering;
-use std::collections::HashMap;
+use std::cmp::{Ordering, PartialOrd};
+use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
@@ -30,6 +30,7 @@ use serde_json::{Map, Value};
use url::Url;
use crate::config::HudiConfigs;
+use crate::file_group::FileGroup;
use crate::storage::utils::split_filename;
use crate::storage::Storage;
@@ -72,6 +73,19 @@ impl Instant {
pub fn file_name(&self) -> String {
format!("{}.{}{}", self.timestamp, self.action, self.state_suffix())
}
+
+ pub fn relative_path(&self) -> Result<String> {
+ let mut commit_file_path = PathBuf::from(".hoodie");
+ commit_file_path.push(self.file_name());
+ commit_file_path
+ .to_str()
+ .ok_or(anyhow!("Failed to get file path for {:?}", self))
+ .map(|s| s.to_string())
+ }
+
+ pub fn is_replacecommit(&self) -> bool {
+ self.action == "replacecommit"
+ }
}
#[derive(Clone, Debug)]
@@ -89,7 +103,7 @@ impl Timeline {
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
- let instants = Self::load_completed_commit_instants(&storage).await?;
+ let instants = Self::load_completed_commits(&storage).await?;
Ok(Self {
storage,
configs,
@@ -97,15 +111,15 @@ impl Timeline {
})
}
- async fn load_completed_commit_instants(storage: &Storage) ->
Result<Vec<Instant>> {
+ async fn load_completed_commits(storage: &Storage) -> Result<Vec<Instant>>
{
let mut completed_commits = Vec::new();
for file_info in storage.list_files(Some(".hoodie")).await? {
let (file_stem, file_ext) =
split_filename(file_info.name.as_str())?;
- if file_ext == "commit" {
+ if matches!(file_ext.as_str(), "commit" | "replacecommit") {
completed_commits.push(Instant {
state: State::Completed,
timestamp: file_stem,
- action: "commit".to_owned(),
+ action: file_ext.to_owned(),
})
}
}
@@ -120,23 +134,22 @@ impl Timeline {
.map(|instant| instant.timestamp.as_str())
}
+ async fn get_commit_metadata(&self, instant: &Instant) ->
Result<Map<String, Value>> {
+ let bytes = self
+ .storage
+ .get_file_data(instant.relative_path()?.as_str())
+ .await?;
+ let json: Value = serde_json::from_slice(&bytes)?;
+ let commit_metadata = json
+ .as_object()
+ .ok_or_else(|| anyhow!("Expected JSON object"))?
+ .clone();
+ Ok(commit_metadata)
+ }
+
async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
match self.instants.iter().next_back() {
- Some(instant) => {
- let mut commit_file_path = PathBuf::from(".hoodie");
- commit_file_path.push(instant.file_name());
- let relative_path = commit_file_path.to_str().ok_or(anyhow!(
- "Failed to get commit file path for instant: {:?}",
- instant
- ))?;
- let bytes = self.storage.get_file_data(relative_path).await?;
- let json: Value = serde_json::from_slice(&bytes)?;
- let commit_metadata = json
- .as_object()
- .ok_or_else(|| anyhow!("Expected JSON object"))?
- .clone();
- Ok(commit_metadata)
- }
+ Some(instant) => self.get_commit_metadata(instant).await,
None => Ok(Map::new()),
}
}
@@ -167,6 +180,35 @@ impl Timeline {
))
}
}
+
+ pub async fn get_replaced_file_groups(&self) -> Result<HashSet<FileGroup>>
{
+ let mut fgs: HashSet<FileGroup> = HashSet::new();
+ for instant in self.instants.iter().filter(|i| i.is_replacecommit()) {
+ let commit_metadata = self.get_commit_metadata(instant).await?;
+ if let Some(ptn_to_replaced) =
commit_metadata.get("partitionToReplaceFileIds") {
+ for (ptn, fg_ids) in ptn_to_replaced
+ .as_object()
+ .expect("partitionToReplaceFileIds should be a map")
+ {
+ let fg_ids = fg_ids
+ .as_array()
+ .expect("file group ids should be an array")
+ .iter()
+ .map(|fg_id| fg_id.as_str().expect("file group id
should be a string"));
+
+ let ptn = Some(ptn.to_string()).filter(|s| !s.is_empty());
+
+ for fg_id in fg_ids {
+ fgs.insert(FileGroup::new(fg_id.to_string(),
ptn.clone()));
+ }
+ }
+ }
+ }
+
+ // TODO: return file group and instants, and handle multi-writer fg id
conflicts
+
+ Ok(fgs)
+ }
}
#[cfg(test)]
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f7a2c43..fe23015 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -138,7 +138,8 @@ mod tests {
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
V6ComplexkeygenHivestyle, V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
- V6SimplekeygenNonhivestyle, V6TimebasedkeygenNonhivestyle,
+ V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
+ V6TimebasedkeygenNonhivestyle,
};
use hudi_tests::{utils, TestTable};
use utils::{get_bool_column, get_i32_column, get_str_column};
@@ -228,4 +229,42 @@ mod tests {
verify_data(&ctx, &sql, test_table.as_ref()).await
}
}
+
+ async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str,
table_name: &str) {
+ let df = ctx.sql(sql).await.unwrap();
+ let rb = df.collect().await.unwrap();
+ let rb = rb.first().unwrap();
+ assert_eq!(get_i32_column(rb, "id"), &[4]);
+ assert_eq!(get_str_column(rb, "name"), &["Diana"]);
+ assert_eq!(get_bool_column(rb, "isActive"), &[false]);
+ assert_eq!(
+ get_i32_column(rb, &format!("{}.structField[field2]", table_name)),
+ &[50]
+ );
+ }
+
+ #[tokio::test]
+ async fn datafusion_read_hudi_table_with_replacecommits() {
+ for (test_table, planned_input_partitions) in
+ &[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
+ {
+ println!(">>> testing for {}", test_table.as_ref());
+ let ctx = prepare_session_context(
+ test_table,
+ &[(InputPartitions.as_ref().to_string(), "2".to_string())],
+ )
+ .await;
+
+ let sql = format!(
+ r#"
+ SELECT id, name, isActive, structField.field2
+ FROM {} WHERE id % 2 = 0
+ AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+ test_table.as_ref()
+ );
+
+ verify_plan(&ctx, &sql, test_table.as_ref(),
planned_input_partitions).await;
+ verify_data_with_replacecommits(&ctx, &sql,
test_table.as_ref()).await
+ }
+ }
}
diff --git
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
new file mode 100644
index 0000000..df90fbe
--- /dev/null
+++ b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
+ id INT,
+ name STRING,
+ isActive BOOLEAN,
+ shortField SHORT,
+ intField INT,
+ longField LONG,
+ floatField FLOAT,
+ doubleField DOUBLE,
+ decimalField DECIMAL(10,5),
+ dateField DATE,
+ timestampField TIMESTAMP,
+ binaryField BINARY,
+ arrayField
ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>, -- Array of structs
+ mapField MAP<STRING,
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>,
-- Map with struct values
+ structField STRUCT<
+ field1: STRING,
+ field2: INT,
+ child_struct: STRUCT<
+ child_field1: DOUBLE,
+ child_field2: BOOLEAN
+ >
+ >,
+ byteField BYTE
+)
+ USING HUDI
+TBLPROPERTIES (
+ type = 'cow',
+ primaryKey = 'id',
+ preCombineField = 'longField',
+ 'hoodie.metadata.enable' = 'false',
+ 'hoodie.datasource.write.hive_style_partitioning' = 'false',
+ 'hoodie.datasource.write.drop.partition.columns' = 'false'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+ (1, 'Alice', true, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100),
STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456,
true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30,
STRUCT(123.456, true)),
+ 10
+ ),
+ (2, 'Bob', false, 100, 25000,
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE),
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+ ARRAY(STRUCT('yellow', 400),
STRUCT('purple', 500)),
+ MAP('key3', STRUCT(234.567,
true), 'key4', STRUCT(567.890, false)),
+ STRUCT('Bob', 40,
STRUCT(789.012, false)),
+ 20
+ ),
+ (3, 'Carol', true, 200, 35000,
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE),
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS
BINARY),
+ ARRAY(STRUCT('black', 600),
STRUCT('white', 700), STRUCT('pink', 800)),
+ MAP('key5', STRUCT(345.678,
true), 'key6', STRUCT(654.321, false)),
+ STRUCT('Carol', 25,
STRUCT(456.789, true)),
+ 10
+ );
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+ (1, 'Alice', false, 300, 15000,
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE),
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+ ARRAY(STRUCT('red', 100),
STRUCT('blue', 200), STRUCT('green', 300)),
+ MAP('key1', STRUCT(123.456,
true), 'key2', STRUCT(789.012, false)),
+ STRUCT('Alice', 30,
STRUCT(123.456, true)),
+ 10
+ ),
+ (4, 'Diana', true, 500, 45000,
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE),
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900),
STRUCT('gray', 1000)),
+ MAP('key7', STRUCT(456.789,
true), 'key8', STRUCT(123.456, false)),
+ STRUCT('Diana', 50,
STRUCT(987.654, true)),
+ 30
+ );
+
+INSERT OVERWRITE TABLE v6_simplekeygen_nonhivestyle_overwritetable SELECT
+ 4, 'Diana', false, 500, 45000,
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE),
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+ ARRAY(STRUCT('orange', 900),
STRUCT('gray', 1000)),
+ MAP('key7', STRUCT(456.789,
true), 'key8', STRUCT(123.456, false)),
+ STRUCT('Diana', 50,
STRUCT(987.654, true)),
+ 30
+ ;
diff --git
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
new file mode 100644
index 0000000..c3ab4f3
Binary files /dev/null and
b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
differ
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index 22b2f3e..457ae4e 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -42,6 +42,7 @@ pub enum TestTable {
V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle,
+ V6SimplekeygenNonhivestyleOverwritetable,
V6TimebasedkeygenNonhivestyle,
}