Copilot commented on code in PR #477:
URL: https://github.com/apache/hudi-rs/pull/477#discussion_r2471465209


##########
crates/core/src/metadata/commit.rs:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents statistics for a single file write operation in a commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieWriteStat {
+    #[avro(rename = "fileId")]
+    pub file_id: Option<String>,
+    pub path: Option<String>,
+    #[avro(rename = "baseFile")]
+    pub base_file: Option<String>,
+    #[avro(rename = "logFiles")]
+    pub log_files: Option<Vec<String>>,
+    #[avro(rename = "prevCommit")]
+    pub prev_commit: Option<String>,
+    #[avro(rename = "numWrites")]
+    pub num_writes: Option<i64>,
+    #[avro(rename = "numDeletes")]
+    pub num_deletes: Option<i64>,
+    #[avro(rename = "numUpdateWrites")]
+    pub num_update_writes: Option<i64>,
+    #[avro(rename = "numInserts")]
+    pub num_inserts: Option<i64>,
+    #[avro(rename = "totalWriteBytes")]
+    pub total_write_bytes: Option<i64>,
+    #[avro(rename = "totalWriteErrors")]
+    pub total_write_errors: Option<i64>,
+}
+
+/// Represents the metadata for a Hudi commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieCommitMetadata::get_schema()`.
+///
+/// # Example
+/// ```
+/// use hudi_core::metadata::commit::HoodieCommitMetadata;
+/// use apache_avro::Schema;
+///
+/// // Get the Avro schema
+/// let schema = HoodieCommitMetadata::get_schema();
+/// println!("Schema: {}", schema.canonical_form());
+/// ```
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieCommitMetadata {
+    pub version: Option<i32>,
+    #[avro(rename = "operationType")]
+    pub operation_type: Option<String>,
+    #[avro(rename = "partitionToWriteStats")]
+    pub partition_to_write_stats: Option<HashMap<String, 
Vec<HoodieWriteStat>>>,
+    #[avro(rename = "partitionToReplaceFileIds")]
+    pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+    pub compacted: Option<bool>,
+    #[avro(rename = "extraMetadata")]
+    pub extra_metadata: Option<HashMap<String, String>>,
+}
+
+impl HoodieCommitMetadata {
+    /// Parse commit metadata from a serde_json Map
+    pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+        serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Parse commit metadata from JSON bytes
+    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+        serde_json::from_slice(bytes).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Get the write stats for a specific partition
+    pub fn get_partition_write_stats(&self, partition: &str) -> 
Option<&Vec<HoodieWriteStat>> {
+        self.partition_to_write_stats
+            .as_ref()
+            .and_then(|stats| stats.get(partition))
+    }
+
+    /// Get all partitions with write stats
+    pub fn get_partitions_with_writes(&self) -> Vec<String> {
+        self.partition_to_write_stats
+            .as_ref()
+            .map(|stats| stats.keys().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    /// Get the file IDs to be replaced for a specific partition
+    pub fn get_partition_replace_file_ids(&self, partition: &str) -> 
Option<&Vec<String>> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .and_then(|ids| ids.get(partition))
+    }
+
+    /// Get all partitions with file replacements
+    pub fn get_partitions_with_replacements(&self) -> Vec<String> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .map(|ids| ids.keys().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    /// Iterate over all write stats across all partitions
+    pub fn iter_write_stats(&self) -> impl Iterator<Item = (&String, 
&HoodieWriteStat)> {
+        self.partition_to_write_stats.iter().flat_map(|stats| {
+            stats.iter().flat_map(|(partition, write_stats)| {
+                write_stats.iter().map(move |stat| (partition, stat))
+            })
+        })
+    }
+
+    /// Iterate over all replace file IDs across all partitions
+    pub fn iter_replace_file_ids(&self) -> impl Iterator<Item = (&String, 
&String)> {
+        self.partition_to_replace_file_ids
+            .iter()
+            .flat_map(|replace_ids| {
+                replace_ids.iter().flat_map(|(partition, file_ids)| {
+                    file_ids.iter().map(move |file_id| (partition, file_id))
+                })
+            })
+    }

Review Comment:
   The iterator implementation incorrectly calls `.iter()` directly on 
`Option<HashMap>`. Since `partition_to_replace_file_ids` is 
`Option<HashMap<String, Vec<String>>>`, calling `.iter()` on it creates an 
iterator over the Option itself (yielding 0 or 1 items), not the HashMap 
contents. This should be `.as_ref().into_iter().flatten()` or use 
`.as_ref()?.iter()` pattern to properly iterate over the HashMap entries.



##########
crates/core/src/schema/resolver.rs:
##########
@@ -121,55 +122,40 @@ async fn resolve_schema_from_base_file(
     commit_metadata: &Map<String, Value>,
     storage: Arc<Storage>,
 ) -> Result<Schema> {
-    let first_partition = commit_metadata
-        .get("partitionToWriteStats")
-        .and_then(|v| v.as_object());
+    let metadata = HoodieCommitMetadata::from_json_map(commit_metadata)?;
 
-    let partition_path = first_partition
-        .and_then(|obj| obj.keys().next())
-        .ok_or_else(|| {
+    // Get the first write stat from any partition
+    let (partition, first_stat) = 
metadata.iter_write_stats().next().ok_or_else(|| {
+        CoreError::CommitMetadata(
+            "Failed to resolve the latest schema: no write status in commit 
metadata".to_string(),

Review Comment:
   Corrected spelling of 'status' to 'stats' for consistency with 'write stats' 
terminology used throughout the codebase.
   ```suggestion
               "Failed to resolve the latest schema: no write stats in commit 
metadata".to_string(),
   ```



##########
crates/core/src/metadata/commit.rs:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents statistics for a single file write operation in a commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieWriteStat {
+    #[avro(rename = "fileId")]
+    pub file_id: Option<String>,
+    pub path: Option<String>,
+    #[avro(rename = "baseFile")]
+    pub base_file: Option<String>,
+    #[avro(rename = "logFiles")]
+    pub log_files: Option<Vec<String>>,
+    #[avro(rename = "prevCommit")]
+    pub prev_commit: Option<String>,
+    #[avro(rename = "numWrites")]
+    pub num_writes: Option<i64>,
+    #[avro(rename = "numDeletes")]
+    pub num_deletes: Option<i64>,
+    #[avro(rename = "numUpdateWrites")]
+    pub num_update_writes: Option<i64>,
+    #[avro(rename = "numInserts")]
+    pub num_inserts: Option<i64>,
+    #[avro(rename = "totalWriteBytes")]
+    pub total_write_bytes: Option<i64>,
+    #[avro(rename = "totalWriteErrors")]
+    pub total_write_errors: Option<i64>,
+}
+
+/// Represents the metadata for a Hudi commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieCommitMetadata::get_schema()`.
+///
+/// # Example
+/// ```
+/// use hudi_core::metadata::commit::HoodieCommitMetadata;
+/// use apache_avro::Schema;
+///
+/// // Get the Avro schema
+/// let schema = HoodieCommitMetadata::get_schema();
+/// println!("Schema: {}", schema.canonical_form());
+/// ```
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieCommitMetadata {
+    pub version: Option<i32>,
+    #[avro(rename = "operationType")]
+    pub operation_type: Option<String>,
+    #[avro(rename = "partitionToWriteStats")]
+    pub partition_to_write_stats: Option<HashMap<String, 
Vec<HoodieWriteStat>>>,
+    #[avro(rename = "partitionToReplaceFileIds")]
+    pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+    pub compacted: Option<bool>,
+    #[avro(rename = "extraMetadata")]
+    pub extra_metadata: Option<HashMap<String, String>>,
+}
+
+impl HoodieCommitMetadata {
+    /// Parse commit metadata from a serde_json Map
+    pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+        serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Parse commit metadata from JSON bytes
+    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+        serde_json::from_slice(bytes).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Get the write stats for a specific partition
+    pub fn get_partition_write_stats(&self, partition: &str) -> 
Option<&Vec<HoodieWriteStat>> {
+        self.partition_to_write_stats
+            .as_ref()
+            .and_then(|stats| stats.get(partition))
+    }
+
+    /// Get all partitions with write stats
+    pub fn get_partitions_with_writes(&self) -> Vec<String> {
+        self.partition_to_write_stats
+            .as_ref()
+            .map(|stats| stats.keys().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    /// Get the file IDs to be replaced for a specific partition
+    pub fn get_partition_replace_file_ids(&self, partition: &str) -> 
Option<&Vec<String>> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .and_then(|ids| ids.get(partition))
+    }
+
+    /// Get all partitions with file replacements
+    pub fn get_partitions_with_replacements(&self) -> Vec<String> {
+        self.partition_to_replace_file_ids
+            .as_ref()
+            .map(|ids| ids.keys().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    /// Iterate over all write stats across all partitions
+    pub fn iter_write_stats(&self) -> impl Iterator<Item = (&String, 
&HoodieWriteStat)> {
+        self.partition_to_write_stats.iter().flat_map(|stats| {
+            stats.iter().flat_map(|(partition, write_stats)| {
+                write_stats.iter().map(move |stat| (partition, stat))
+            })
+        })

Review Comment:
   The iterator implementation incorrectly calls `.iter()` directly on 
`Option<HashMap>`. Since `partition_to_write_stats` is `Option<HashMap<String, 
Vec<HoodieWriteStat>>>`, calling `.iter()` on it creates an iterator over the 
Option itself (yielding 0 or 1 items), not the HashMap contents. This should be 
`.as_ref().into_iter().flatten()` or use `.as_ref()?.iter()` pattern to 
properly iterate over the HashMap entries.
   ```suggestion
           self.partition_to_write_stats
               .as_ref()
               .into_iter()
               .flat_map(|stats| {
                   stats.iter().flat_map(|(partition, write_stats)| {
                       write_stats.iter().map(move |stat| (partition, stat))
                   })
               })
   ```



##########
crates/core/src/metadata/replace_commit.rs:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::metadata::commit::HoodieWriteStat;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents the metadata for a Hudi Replace Commit
+///
+/// This is modeled from HoodieReplaceCommitMetadata.avsc.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieReplaceCommitMetadata {
+    // version: ["int","null"] with default 1 in Avro; we model as Option<i32>
+    pub version: Option<i32>,
+    #[avro(rename = "operationType")]
+    pub operation_type: Option<String>,
+    #[avro(rename = "partitionToWriteStats")]
+    pub partition_to_write_stats: Option<HashMap<String, 
Vec<HoodieWriteStat>>>,
+    pub compacted: Option<bool>,
+    #[avro(rename = "extraMetadata")]
+    pub extra_metadata: Option<HashMap<String, String>>,
+    #[avro(rename = "partitionToReplaceFileIds")]
+    pub partition_to_replace_file_ids: Option<HashMap<String, Vec<String>>>,
+}
+
+impl HoodieReplaceCommitMetadata {
+    /// Parse replace commit metadata from a serde_json Map
+    pub fn from_json_map(map: &Map<String, Value>) -> Result<Self> {
+        serde_json::from_value(Value::Object(map.clone())).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Parse replace commit metadata from JSON bytes
+    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
+        serde_json::from_slice(bytes).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to parse commit 
metadata: {}", e))
+        })
+    }
+
+    /// Iterate over all replace file IDs across all partitions
+    pub fn iter_replace_file_ids(&self) -> impl Iterator<Item = (&String, 
&String)> {
+        self.partition_to_replace_file_ids
+            .iter()
+            .flat_map(|replace_ids| {
+                replace_ids.iter().flat_map(|(partition, file_ids)| {
+                    file_ids.iter().map(move |file_id| (partition, file_id))
+                })
+            })
+    }

Review Comment:
   The iterator implementation incorrectly calls `.iter()` directly on 
`Option<HashMap>`. Since `partition_to_replace_file_ids` is 
`Option<HashMap<String, Vec<String>>>`, calling `.iter()` on it creates an 
iterator over the Option itself (yielding 0 or 1 items), not the HashMap 
contents. This should be `.as_ref().into_iter().flatten()` or use 
`.as_ref()?.iter()` pattern to properly iterate over the HashMap entries.



##########
crates/core/src/metadata/commit.rs:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::CoreError;
+use crate::Result;
+use apache_avro_derive::AvroSchema as DeriveAvroSchema;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::collections::HashMap;
+
+/// Represents statistics for a single file write operation in a commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
+#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase")]
+#[avro(namespace = "org.apache.hudi.avro.model")]
+pub struct HoodieWriteStat {
+    #[avro(rename = "fileId")]
+    pub file_id: Option<String>,
+    pub path: Option<String>,
+    #[avro(rename = "baseFile")]
+    pub base_file: Option<String>,
+    #[avro(rename = "logFiles")]
+    pub log_files: Option<Vec<String>>,
+    #[avro(rename = "prevCommit")]
+    pub prev_commit: Option<String>,
+    #[avro(rename = "numWrites")]
+    pub num_writes: Option<i64>,
+    #[avro(rename = "numDeletes")]
+    pub num_deletes: Option<i64>,
+    #[avro(rename = "numUpdateWrites")]
+    pub num_update_writes: Option<i64>,
+    #[avro(rename = "numInserts")]
+    pub num_inserts: Option<i64>,
+    #[avro(rename = "totalWriteBytes")]
+    pub total_write_bytes: Option<i64>,
+    #[avro(rename = "totalWriteErrors")]
+    pub total_write_errors: Option<i64>,
+}
+
+/// Represents the metadata for a Hudi commit
+///
+/// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
+/// The Avro schema can be accessed via `HoodieCommitMetadata::get_schema()`.
+///
+/// # Example
+/// ```
+/// use hudi_core::metadata::commit::HoodieCommitMetadata;

Review Comment:
   The documentation example uses `hudi_core` as the crate name, but this 
appears to be in the `core` crate based on the path `crates/core/src`. The 
example should likely use `crate::metadata::commit::HoodieCommitMetadata` or 
verify the correct public crate name to ensure the example is accurate for 
users.
   ```suggestion
   /// use crate::metadata::commit::HoodieCommitMetadata;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to