This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 9920d5c  fix: handle replacecommit for loading file slices (#53)
9920d5c is described below

commit 9920d5cdb79ab557d331a404f812f8387f869699
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jul 7 04:12:28 2024 -0500

    fix: handle replacecommit for loading file slices (#53)
    
    Filter out file slices that belong to replaced file groups.
    
    Also add a full table test case 
`v6_simplekeygen_nonhivestyle_overwritetable`.
---
 crates/core/src/file_group/mod.rs                  |  16 ++++
 crates/core/src/table/fs_view.rs                   |  25 +++++-
 crates/core/src/table/mod.rs                       |   6 +-
 crates/core/src/table/timeline.rs                  |  84 +++++++++++++-----
 crates/datafusion/src/lib.rs                       |  41 ++++++++-
 ...v6_simplekeygen_nonhivestyle_overwritetable.sql |  96 +++++++++++++++++++++
 ...v6_simplekeygen_nonhivestyle_overwritetable.zip | Bin 0 -> 36037 bytes
 crates/tests/src/lib.rs                            |   1 +
 8 files changed, 241 insertions(+), 28 deletions(-)

diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index c0af0b3..3dd1af3 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,6 +20,7 @@
 use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
+use std::hash::{Hash, Hasher};
 use std::path::PathBuf;
 
 use anyhow::{anyhow, Result};
@@ -116,6 +117,21 @@ pub struct FileGroup {
     pub file_slices: BTreeMap<String, FileSlice>,
 }
 
+impl PartialEq for FileGroup {
+    fn eq(&self, other: &Self) -> bool {
+        self.id == other.id && self.partition_path == other.partition_path
+    }
+}
+
+impl Eq for FileGroup {}
+
+impl Hash for FileGroup {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.id.hash(state);
+        self.partition_path.hash(state);
+    }
+}
+
 impl fmt::Display for FileGroup {
     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         f.write_str(
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index bdc05fc..974bbbc 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
 use anyhow::{anyhow, Result};
@@ -121,11 +121,18 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
+    pub fn get_file_slices_as_of(
+        &self,
+        timestamp: &str,
+        excluding_file_groups: &HashSet<FileGroup>,
+    ) -> Result<Vec<FileSlice>> {
         let mut file_slices = Vec::new();
         for fgs in self.partition_to_file_groups.iter() {
             let fgs_ref = fgs.value();
             for fg in fgs_ref {
+                if excluding_file_groups.contains(fg) {
+                    continue;
+                }
                 if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
                     // TODO: pass ref instead of copying
                     file_slices.push(fsl.clone());
@@ -135,10 +142,17 @@ impl FileSystemView {
         Ok(file_slices)
     }
 
-    pub async fn load_file_slices_stats_as_of(&self, timestamp: &str) -> 
Result<()> {
+    pub async fn load_file_slices_stats_as_of(
+        &self,
+        timestamp: &str,
+        exclude_file_groups: &HashSet<FileGroup>,
+    ) -> Result<()> {
         for mut fgs in self.partition_to_file_groups.iter_mut() {
             let fgs_ref = fgs.value_mut();
             for fg in fgs_ref {
+                if exclude_file_groups.contains(fg) {
+                    continue;
+                }
                 if let Some(file_slice) = 
fg.get_file_slice_mut_as_of(timestamp) {
                     file_slice
                         .load_stats(&self.storage)
@@ -215,7 +229,10 @@ mod tests {
         .await
         .unwrap();
 
-        let file_slices = 
fs_view.get_file_slices_as_of("20240418173551906").unwrap();
+        let excludes = HashSet::new();
+        let file_slices = fs_view
+            .get_file_slices_as_of("20240418173551906", &excludes)
+            .unwrap();
         assert_eq!(file_slices.len(), 1);
         let fg_ids = file_slices
             .iter()
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index bbc5673..07126f1 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -182,11 +182,13 @@ impl Table {
     }
 
     async fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
+        let excludes = self.timeline.get_replaced_file_groups().await?;
         self.file_system_view
-            .load_file_slices_stats_as_of(timestamp)
+            .load_file_slices_stats_as_of(timestamp, &excludes)
             .await
             .context("Fail to load file slice stats.")?;
-        self.file_system_view.get_file_slices_as_of(timestamp)
+        self.file_system_view
+            .get_file_slices_as_of(timestamp, &excludes)
     }
 
     pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index 9010738..ae92bac 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -17,8 +17,8 @@
  * under the License.
  */
 
-use std::cmp::Ordering;
-use std::collections::HashMap;
+use std::cmp::{Ordering, PartialOrd};
+use std::collections::{HashMap, HashSet};
 use std::fmt::Debug;
 use std::path::PathBuf;
 use std::sync::Arc;
@@ -30,6 +30,7 @@ use serde_json::{Map, Value};
 use url::Url;
 
 use crate::config::HudiConfigs;
+use crate::file_group::FileGroup;
 use crate::storage::utils::split_filename;
 use crate::storage::Storage;
 
@@ -72,6 +73,19 @@ impl Instant {
     pub fn file_name(&self) -> String {
         format!("{}.{}{}", self.timestamp, self.action, self.state_suffix())
     }
+
+    pub fn relative_path(&self) -> Result<String> {
+        let mut commit_file_path = PathBuf::from(".hoodie");
+        commit_file_path.push(self.file_name());
+        commit_file_path
+            .to_str()
+            .ok_or(anyhow!("Failed to get file path for {:?}", self))
+            .map(|s| s.to_string())
+    }
+
+    pub fn is_replacecommit(&self) -> bool {
+        self.action == "replacecommit"
+    }
 }
 
 #[derive(Clone, Debug)]
@@ -89,7 +103,7 @@ impl Timeline {
         configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
         let storage = Storage::new(base_url, &storage_options)?;
-        let instants = Self::load_completed_commit_instants(&storage).await?;
+        let instants = Self::load_completed_commits(&storage).await?;
         Ok(Self {
             storage,
             configs,
@@ -97,15 +111,15 @@ impl Timeline {
         })
     }
 
-    async fn load_completed_commit_instants(storage: &Storage) -> 
Result<Vec<Instant>> {
+    async fn load_completed_commits(storage: &Storage) -> Result<Vec<Instant>> 
{
         let mut completed_commits = Vec::new();
         for file_info in storage.list_files(Some(".hoodie")).await? {
             let (file_stem, file_ext) = 
split_filename(file_info.name.as_str())?;
-            if file_ext == "commit" {
+            if matches!(file_ext.as_str(), "commit" | "replacecommit") {
                 completed_commits.push(Instant {
                     state: State::Completed,
                     timestamp: file_stem,
-                    action: "commit".to_owned(),
+                    action: file_ext.to_owned(),
                 })
             }
         }
@@ -120,23 +134,22 @@ impl Timeline {
             .map(|instant| instant.timestamp.as_str())
     }
 
+    async fn get_commit_metadata(&self, instant: &Instant) -> 
Result<Map<String, Value>> {
+        let bytes = self
+            .storage
+            .get_file_data(instant.relative_path()?.as_str())
+            .await?;
+        let json: Value = serde_json::from_slice(&bytes)?;
+        let commit_metadata = json
+            .as_object()
+            .ok_or_else(|| anyhow!("Expected JSON object"))?
+            .clone();
+        Ok(commit_metadata)
+    }
+
     async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
         match self.instants.iter().next_back() {
-            Some(instant) => {
-                let mut commit_file_path = PathBuf::from(".hoodie");
-                commit_file_path.push(instant.file_name());
-                let relative_path = commit_file_path.to_str().ok_or(anyhow!(
-                    "Failed to get commit file path for instant: {:?}",
-                    instant
-                ))?;
-                let bytes = self.storage.get_file_data(relative_path).await?;
-                let json: Value = serde_json::from_slice(&bytes)?;
-                let commit_metadata = json
-                    .as_object()
-                    .ok_or_else(|| anyhow!("Expected JSON object"))?
-                    .clone();
-                Ok(commit_metadata)
-            }
+            Some(instant) => self.get_commit_metadata(instant).await,
             None => Ok(Map::new()),
         }
     }
@@ -167,6 +180,35 @@ impl Timeline {
             ))
         }
     }
+
+    pub async fn get_replaced_file_groups(&self) -> Result<HashSet<FileGroup>> 
{
+        let mut fgs: HashSet<FileGroup> = HashSet::new();
+        for instant in self.instants.iter().filter(|i| i.is_replacecommit()) {
+            let commit_metadata = self.get_commit_metadata(instant).await?;
+            if let Some(ptn_to_replaced) = 
commit_metadata.get("partitionToReplaceFileIds") {
+                for (ptn, fg_ids) in ptn_to_replaced
+                    .as_object()
+                    .expect("partitionToReplaceFileIds should be a map")
+                {
+                    let fg_ids = fg_ids
+                        .as_array()
+                        .expect("file group ids should be an array")
+                        .iter()
+                        .map(|fg_id| fg_id.as_str().expect("file group id 
should be a string"));
+
+                    let ptn = Some(ptn.to_string()).filter(|s| !s.is_empty());
+
+                    for fg_id in fg_ids {
+                        fgs.insert(FileGroup::new(fg_id.to_string(), 
ptn.clone()));
+                    }
+                }
+            }
+        }
+
+        // TODO: return file group and instants, and handle multi-writer fg id 
conflicts
+
+        Ok(fgs)
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f7a2c43..fe23015 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -138,7 +138,8 @@ mod tests {
     use hudi_core::config::read::HudiReadConfig::InputPartitions;
     use hudi_tests::TestTable::{
         V6ComplexkeygenHivestyle, V6Nonpartitioned, 
V6SimplekeygenHivestyleNoMetafields,
-        V6SimplekeygenNonhivestyle, V6TimebasedkeygenNonhivestyle,
+        V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
+        V6TimebasedkeygenNonhivestyle,
     };
     use hudi_tests::{utils, TestTable};
     use utils::{get_bool_column, get_i32_column, get_str_column};
@@ -228,4 +229,42 @@ mod tests {
             verify_data(&ctx, &sql, test_table.as_ref()).await
         }
     }
+
+    async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str, 
table_name: &str) {
+        let df = ctx.sql(sql).await.unwrap();
+        let rb = df.collect().await.unwrap();
+        let rb = rb.first().unwrap();
+        assert_eq!(get_i32_column(rb, "id"), &[4]);
+        assert_eq!(get_str_column(rb, "name"), &["Diana"]);
+        assert_eq!(get_bool_column(rb, "isActive"), &[false]);
+        assert_eq!(
+            get_i32_column(rb, &format!("{}.structField[field2]", table_name)),
+            &[50]
+        );
+    }
+
+    #[tokio::test]
+    async fn datafusion_read_hudi_table_with_replacecommits() {
+        for (test_table, planned_input_partitions) in
+            &[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
+        {
+            println!(">>> testing for {}", test_table.as_ref());
+            let ctx = prepare_session_context(
+                test_table,
+                &[(InputPartitions.as_ref().to_string(), "2".to_string())],
+            )
+            .await;
+
+            let sql = format!(
+                r#"
+            SELECT id, name, isActive, structField.field2
+            FROM {} WHERE id % 2 = 0
+            AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
+                test_table.as_ref()
+            );
+
+            verify_plan(&ctx, &sql, test_table.as_ref(), 
planned_input_partitions).await;
+            verify_data_with_replacecommits(&ctx, &sql, 
test_table.as_ref()).await
+        }
+    }
 }
diff --git 
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql 
b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
new file mode 100644
index 0000000..df90fbe
--- /dev/null
+++ b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.sql
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
+                                              id INT,
+                                              name STRING,
+                                              isActive BOOLEAN,
+                                              shortField SHORT,
+                                              intField INT,
+                                              longField LONG,
+                                              floatField FLOAT,
+                                              doubleField DOUBLE,
+                                              decimalField DECIMAL(10,5),
+                                              dateField DATE,
+                                              timestampField TIMESTAMP,
+                                              binaryField BINARY,
+                                              arrayField 
ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>,  -- Array of structs
+                                              mapField MAP<STRING, 
STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>, 
 -- Map with struct values
+                                              structField STRUCT<
+                                                  field1: STRING,
+                                              field2: INT,
+                                              child_struct: STRUCT<
+                                                  child_field1: DOUBLE,
+                                              child_field2: BOOLEAN
+                                                  >
+                                                  >,
+                                              byteField BYTE
+)
+    USING HUDI
+TBLPROPERTIES (
+    type = 'cow',
+    primaryKey = 'id',
+    preCombineField = 'longField',
+    'hoodie.metadata.enable' = 'false',
+    'hoodie.datasource.write.hive_style_partitioning' = 'false',
+    'hoodie.datasource.write.drop.partition.columns' = 'false'
+)
+PARTITIONED BY (byteField);
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+                                             (1, 'Alice', true, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                              ARRAY(STRUCT('red', 100), 
STRUCT('blue', 200), STRUCT('green', 300)),
+                                              MAP('key1', STRUCT(123.456, 
true), 'key2', STRUCT(789.012, false)),
+                                              STRUCT('Alice', 30, 
STRUCT(123.456, true)),
+                                              10
+                                             ),
+                                             (2, 'Bob', false, 100, 25000, 
9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), 
CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
+                                              ARRAY(STRUCT('yellow', 400), 
STRUCT('purple', 500)),
+                                              MAP('key3', STRUCT(234.567, 
true), 'key4', STRUCT(567.890, false)),
+                                              STRUCT('Bob', 40, 
STRUCT(789.012, false)),
+                                              20
+                                             ),
+                                             (3, 'Carol', true, 200, 35000, 
1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE), 
CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS 
BINARY),
+                                              ARRAY(STRUCT('black', 600), 
STRUCT('white', 700), STRUCT('pink', 800)),
+                                              MAP('key5', STRUCT(345.678, 
true), 'key6', STRUCT(654.321, false)),
+                                              STRUCT('Carol', 25, 
STRUCT(456.789, true)),
+                                              10
+                                             );
+
+INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
+                                             (1, 'Alice', false, 300, 15000, 
1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), 
CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
+                                              ARRAY(STRUCT('red', 100), 
STRUCT('blue', 200), STRUCT('green', 300)),
+                                              MAP('key1', STRUCT(123.456, 
true), 'key2', STRUCT(789.012, false)),
+                                              STRUCT('Alice', 30, 
STRUCT(123.456, true)),
+                                              10
+                                             ),
+                                             (4, 'Diana', true, 500, 45000, 
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), 
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+                                              ARRAY(STRUCT('orange', 900), 
STRUCT('gray', 1000)),
+                                              MAP('key7', STRUCT(456.789, 
true), 'key8', STRUCT(123.456, false)),
+                                              STRUCT('Diana', 50, 
STRUCT(987.654, true)),
+                                              30
+                                             );
+
+INSERT OVERWRITE TABLE v6_simplekeygen_nonhivestyle_overwritetable SELECT
+                                              4, 'Diana', false, 500, 45000, 
987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), 
CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
+                                              ARRAY(STRUCT('orange', 900), 
STRUCT('gray', 1000)),
+                                              MAP('key7', STRUCT(456.789, 
true), 'key8', STRUCT(123.456, false)),
+                                              STRUCT('Diana', 50, 
STRUCT(987.654, true)),
+                                              30
+                                             ;
diff --git 
a/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip 
b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip
new file mode 100644
index 0000000..c3ab4f3
Binary files /dev/null and 
b/crates/tests/data/tables/v6_simplekeygen_nonhivestyle_overwritetable.zip 
differ
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index 22b2f3e..457ae4e 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -42,6 +42,7 @@ pub enum TestTable {
     V6Nonpartitioned,
     V6SimplekeygenHivestyleNoMetafields,
     V6SimplekeygenNonhivestyle,
+    V6SimplekeygenNonhivestyleOverwritetable,
     V6TimebasedkeygenNonhivestyle,
 }
 

Reply via email to