This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 586d210  style: enforce rust code style (#14)
586d210 is described below

commit 586d210f7593dd97f0b3305b5fb0bdbd08799123
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun May 5 17:33:54 2024 -0500

    style: enforce rust code style (#14)
    
    Use `rustfmt` and `clippy` to enforce code style through CI check job.
---
 .github/workflows/ci.yml                  |  8 +--
 Cargo.toml                                |  6 +-
 Makefile                                  | 31 +++++++++++
 crates/core/src/error.rs                  | 11 ----
 crates/core/src/file_group/mod.rs         | 66 ++++++++++++----------
 crates/core/src/table/file_system_view.rs | 64 ++++++++++++----------
 crates/core/src/table/meta_client.rs      | 83 +++++++++++++++-------------
 crates/core/src/table/mod.rs              | 25 +++++----
 crates/core/src/timeline/mod.rs           | 91 ++++++++++++++++---------------
 crates/datafusion/src/bin/main.rs         |  4 +-
 crates/datafusion/src/lib.rs              | 25 +++------
 crates/fs/src/file_systems.rs             | 29 ++++++----
 rust-toolchain.toml                       | 21 +++++++
 13 files changed, 267 insertions(+), 197 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 948a05b..4bc8235 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -34,9 +34,10 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v4
-
-      - name: Check License Header
+      - name: Check license header
         uses: apache/skywalking-eyes/[email protected]
+      - name: Check code style
+        run: make check
 
   build:
     runs-on: ${{ matrix.os }}
@@ -55,6 +56,5 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v4
-
-      - name: Unit Test
+      - name: Unit test
         run: cargo test --no-fail-fast --all-targets --all-features --workspace
diff --git a/Cargo.toml b/Cargo.toml
index a24ee76..8c9a163 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,7 +22,7 @@ members = [
 resolver = "2"
 
 [workspace.package]
-version = "0.2.0"
+version = "0.1.0"
 edition = "2021"
 license = "Apache-2.0"
 rust-version = "1.75.0"
@@ -31,7 +31,7 @@ rust-version = "1.75.0"
 # arrow
 arrow = { version = "50" }
 arrow-arith = { version = "50" }
-arrow-array = { version = "50", features = ["chrono-tz"]}
+arrow-array = { version = "50", features = ["chrono-tz"] }
 arrow-buffer = { version = "50" }
 arrow-cast = { version = "50" }
 arrow-ipc = { version = "50" }
@@ -68,4 +68,4 @@ uuid = { version = "1" }
 async-trait = { version = "0.1" }
 futures = { version = "0.3" }
 tokio = { version = "1" }
-num_cpus = { version = "1" }
\ No newline at end of file
+num_cpus = { version = "1" }
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..3a5ba3f
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+.EXPORT_ALL_VARIABLES:
+
+RUST_LOG = debug
+
+build:
+       cargo build
+
+check-fmt:
+       cargo fmt --all -- --check
+
+check-clippy:
+       cargo clippy --all-targets --all-features --workspace -- -D warnings
+
+check: check-fmt check-clippy
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 9b6e3c0..f7f0bd7 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -19,24 +19,15 @@
 
 use std::error::Error;
 use std::fmt::Debug;
-use std::io;
 
 use thiserror::Error;
 
 #[derive(Debug, Error)]
 pub enum HudiFileGroupError {
-    #[error("Base File {0} has unsupported format: {1}")]
-    UnsupportedBaseFileFormat(String, String),
     #[error("Commit time {0} is already present in File Group {1}")]
     CommitTimeAlreadyExists(String, String),
 }
 
-#[derive(Debug, Error)]
-pub enum HudiTimelineError {
-    #[error("Error in reading commit metadata: {0}")]
-    FailToReadCommitMetadata(io::Error),
-}
-
 #[derive(Debug, Error)]
 pub enum HudiFileSystemViewError {
     #[error("Error in loading partitions: {0}")]
@@ -47,8 +38,6 @@ pub enum HudiFileSystemViewError {
 pub enum HudiCoreError {
     #[error("Failed to load file group")]
     FailToLoadFileGroup(#[from] HudiFileGroupError),
-    #[error("Failed to init timeline")]
-    FailToInitTimeline(#[from] HudiTimelineError),
     #[error("Failed to build file system view")]
     FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
 }
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 3472d93..609b6e6 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -26,7 +26,7 @@ use hudi_fs::file_systems::FileMetadata;
 use crate::error::HudiFileGroupError;
 use crate::error::HudiFileGroupError::CommitTimeAlreadyExists;
 
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 pub struct BaseFile {
     pub file_group_id: String,
     pub commit_time: String,
@@ -53,12 +53,13 @@ impl BaseFile {
     }
 }
 
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 pub struct FileSlice {
     pub base_file: BaseFile,
     pub partition_path: Option<String>,
 }
 
+#[allow(dead_code)]
 impl FileSlice {
     pub fn file_path(&self) -> Option<&str> {
         match &self.base_file.metadata {
@@ -99,6 +100,7 @@ impl fmt::Display for FileGroup {
     }
 }
 
+#[allow(dead_code)]
 impl FileGroup {
     pub fn new(id: String, partition_path: Option<String>) -> Self {
         Self {
@@ -140,32 +142,38 @@ impl FileGroup {
     }
 }
 
-#[test]
-fn create_a_base_file_successfully() {
-    let base_file =
-        
BaseFile::new("5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet");
-    assert_eq!(
-        base_file.file_group_id,
-        "5a226868-2934-4f84-a16f-55124630c68d-0"
-    );
-    assert_eq!(base_file.commit_time, "20240402144910683");
-}
+#[cfg(test)]
+mod tests {
+    use crate::file_group::{BaseFile, FileGroup};
+
+    #[test]
+    fn create_a_base_file_successfully() {
+        let base_file = BaseFile::new(
+            
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+        );
+        assert_eq!(
+            base_file.file_group_id,
+            "5a226868-2934-4f84-a16f-55124630c68d-0"
+        );
+        assert_eq!(base_file.commit_time, "20240402144910683");
+    }
 
-#[test]
-fn load_a_valid_file_group() {
-    let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
-    let _ = fg.add_base_file_from_name(
-        
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
-    );
-    let _ = fg.add_base_file_from_name(
-        
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
-    );
-    assert_eq!(fg.file_slices.len(), 2);
-    assert!(fg.partition_path.is_none());
-    let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| 
k.as_str()).collect();
-    assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]);
-    assert_eq!(
-        fg.get_latest_file_slice().unwrap().base_file.commit_time,
-        "20240402144910683"
-    )
+    #[test]
+    fn load_a_valid_file_group() {
+        let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+        let _ = fg.add_base_file_from_name(
+            
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+        );
+        let _ = fg.add_base_file_from_name(
+            
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
+        );
+        assert_eq!(fg.file_slices.len(), 2);
+        assert!(fg.partition_path.is_none());
+        let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| 
k.as_str()).collect();
+        assert_eq!(commit_times, vec!["20240402123035233", 
"20240402144910683"]);
+        assert_eq!(
+            fg.get_latest_file_slice().unwrap().base_file.commit_time,
+            "20240402144910683"
+        )
+    }
 }
diff --git a/crates/core/src/table/file_system_view.rs 
b/crates/core/src/table/file_system_view.rs
index e084ee8..7714a8c 100644
--- a/crates/core/src/table/file_system_view.rs
+++ b/crates/core/src/table/file_system_view.rs
@@ -22,9 +22,6 @@ use 
crate::error::HudiFileSystemViewError::FailToLoadPartitions;
 use crate::file_group::{FileGroup, FileSlice};
 use crate::table::meta_client::MetaClient;
 use hashbrown::HashMap;
-use hudi_fs::test_utils::extract_test_table;
-use std::collections::HashSet;
-use std::path::Path;
 
 pub struct FileSystemView {
     meta_client: MetaClient,
@@ -62,37 +59,46 @@ impl FileSystemView {
         let mut file_slices = Vec::new();
         for fgs in self.partition_to_file_groups.values() {
             for fg in fgs {
-                match fg.get_latest_file_slice() {
-                    Some(file_slice) => file_slices.push(file_slice.clone()),
-                    None => (),
+                if let Some(file_slice) = fg.get_latest_file_slice() {
+                    file_slices.push(file_slice)
                 }
             }
         }
         file_slices
     }
 }
-#[test]
-fn meta_client_get_file_groups() {
-    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-    let target_table_path = extract_test_table(fixture_path);
-    let meta_client = MetaClient::new(&target_table_path);
-    let fs_view = FileSystemView::init(meta_client).unwrap();
-    let file_slices = fs_view.get_latest_file_slices();
-    assert_eq!(file_slices.len(), 5);
-    let mut fg_ids = Vec::new();
-    for f in file_slices {
-        let fp = f.file_group_id();
-        fg_ids.push(fp);
+
+#[cfg(test)]
+mod tests {
+    use crate::table::file_system_view::FileSystemView;
+    use crate::table::meta_client::MetaClient;
+    use hudi_fs::test_utils::extract_test_table;
+    use std::collections::HashSet;
+    use std::path::Path;
+
+    #[test]
+    fn meta_client_get_file_groups() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let target_table_path = extract_test_table(fixture_path);
+        let meta_client = MetaClient::new(&target_table_path);
+        let fs_view = FileSystemView::init(meta_client).unwrap();
+        let file_slices = fs_view.get_latest_file_slices();
+        assert_eq!(file_slices.len(), 5);
+        let mut fg_ids = Vec::new();
+        for f in file_slices {
+            let fp = f.file_group_id();
+            fg_ids.push(fp);
+        }
+        let actual: HashSet<&str> = fg_ids.into_iter().collect();
+        assert_eq!(
+            actual,
+            HashSet::from_iter(vec![
+                "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+                "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
+                "ee915c68-d7f8-44f6-9759-e691add290d8-0",
+                "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
+                "5a226868-2934-4f84-a16f-55124630c68d-0"
+            ])
+        );
     }
-    let actual: HashSet<&str> = fg_ids.into_iter().collect();
-    assert_eq!(
-        actual,
-        HashSet::from_iter(vec![
-            "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
-            "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
-            "ee915c68-d7f8-44f6-9759-e691add290d8-0",
-            "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
-            "5a226868-2934-4f84-a16f-55124630c68d-0"
-        ])
-    );
 }
diff --git a/crates/core/src/table/meta_client.rs 
b/crates/core/src/table/meta_client.rs
index 27f0cf9..15f8495 100644
--- a/crates/core/src/table/meta_client.rs
+++ b/crates/core/src/table/meta_client.rs
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-use std::collections::HashSet;
 use std::error::Error;
 use std::path::{Path, PathBuf};
 use std::{fs, io};
@@ -25,7 +24,6 @@ use std::{fs, io};
 use hashbrown::HashMap;
 
 use hudi_fs::file_systems::FileMetadata;
-use hudi_fs::test_utils::extract_test_table;
 
 use crate::file_group::{BaseFile, FileGroup};
 use crate::timeline::Timeline;
@@ -98,7 +96,7 @@ impl MetaClient {
                 let fg_id = &base_file.file_group_id;
                 fg_id_to_base_files
                     .entry(fg_id.to_owned())
-                    .or_insert_with(Vec::new)
+                    .or_default()
                     .push(base_file);
             }
         }
@@ -114,40 +112,49 @@ impl MetaClient {
     }
 }
 
-#[test]
-fn meta_client_get_partition_paths() {
-    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-    let target_table_path = extract_test_table(fixture_path);
-    let meta_client = MetaClient::new(&target_table_path);
-    let partition_paths = meta_client.get_partition_paths().unwrap();
-    let partition_path_set: HashSet<&str> = 
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
-    assert_eq!(
-        partition_path_set,
-        HashSet::from_iter(vec!["chennai", "sao_paulo", "san_francisco"])
-    )
-}
+#[cfg(test)]
+mod tests {
+    use crate::table::meta_client::MetaClient;
+    use hudi_fs::test_utils::extract_test_table;
+    use std::collections::HashSet;
+    use std::path::Path;
 
-#[test]
-fn meta_client_get_file_groups() {
-    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-    let target_table_path = extract_test_table(fixture_path);
-    let meta_client = MetaClient::new(&target_table_path);
-    let file_groups = meta_client.get_file_groups("san_francisco").unwrap();
-    assert_eq!(file_groups.len(), 3);
-    let fg_ids: HashSet<&str> = HashSet::from_iter(file_groups.iter().map(|fg| 
fg.id.as_str()));
-    assert_eq!(
-        fg_ids,
-        HashSet::from_iter(vec![
-            "5a226868-2934-4f84-a16f-55124630c68d-0",
-            "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
-            "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
-        ])
-    );
-}
-#[test]
-fn meta_client_active_timeline_init_as_expected() {
-    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-    let target_table_path = extract_test_table(fixture_path);
-    let meta_client = MetaClient::new(&target_table_path);
-    assert_eq!(meta_client.timeline.instants.len(), 2)
+    #[test]
+    fn meta_client_get_partition_paths() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let target_table_path = extract_test_table(fixture_path);
+        let meta_client = MetaClient::new(&target_table_path);
+        let partition_paths = meta_client.get_partition_paths().unwrap();
+        let partition_path_set: HashSet<&str> =
+            HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
+        assert_eq!(
+            partition_path_set,
+            HashSet::from_iter(vec!["chennai", "sao_paulo", "san_francisco"])
+        )
+    }
+
+    #[test]
+    fn meta_client_get_file_groups() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let target_table_path = extract_test_table(fixture_path);
+        let meta_client = MetaClient::new(&target_table_path);
+        let file_groups = 
meta_client.get_file_groups("san_francisco").unwrap();
+        assert_eq!(file_groups.len(), 3);
+        let fg_ids: HashSet<&str> = 
HashSet::from_iter(file_groups.iter().map(|fg| fg.id.as_str()));
+        assert_eq!(
+            fg_ids,
+            HashSet::from_iter(vec![
+                "5a226868-2934-4f84-a16f-55124630c68d-0",
+                "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+                "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
+            ])
+        );
+    }
+    #[test]
+    fn meta_client_active_timeline_init_as_expected() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let target_table_path = extract_test_table(fixture_path);
+        let meta_client = MetaClient::new(&target_table_path);
+        assert_eq!(meta_client.timeline.instants.len(), 2)
+    }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0bc65dc..d1c7dc7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -18,12 +18,10 @@
  */
 
 use std::error::Error;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
 
 use arrow_schema::SchemaRef;
 
-use hudi_fs::test_utils::extract_test_table;
-
 use crate::table::file_system_view::FileSystemView;
 use crate::table::meta_client::MetaClient;
 
@@ -68,11 +66,18 @@ impl Table {
     }
 }
 
-#[test]
-fn load_snapshot_file_paths() {
-    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-    let target_table_path = extract_test_table(fixture_path);
-    let hudi_table = Table::new(target_table_path.as_path().to_str().unwrap());
-    assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
-    println!("{}", hudi_table.schema().to_string());
+#[cfg(test)]
+mod tests {
+    use crate::table::Table;
+    use hudi_fs::test_utils::extract_test_table;
+    use std::path::Path;
+
+    #[test]
+    fn load_snapshot_file_paths() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let target_table_path = extract_test_table(fixture_path);
+        let hudi_table = 
Table::new(target_table_path.as_path().to_str().unwrap());
+        assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
+        println!("{}", hudi_table.schema());
+    }
 }
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index c0c4bd9..75d89d5 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -21,23 +21,19 @@ use hudi_fs::file_name_without_ext;
 use std::collections::HashMap;
 
 use arrow_schema::SchemaRef;
-use hudi_fs::test_utils::extract_test_table;
 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
-use parquet::file::serialized_reader::SerializedFileReader;
-use serde::de::Error;
 use serde_json::Value;
 use std::fs::File;
 use std::io::{ErrorKind, Read};
 use std::path::{Path, PathBuf};
 use std::{fs, io};
 
-use crate::error::HudiTimelineError;
-
+#[allow(dead_code)]
 #[derive(Debug, Clone, PartialEq)]
 pub enum State {
-    REQUESTED,
-    INFLIGHT,
-    COMPLETED,
+    Requested,
+    Inflight,
+    Completed,
 }
 
 #[derive(Debug, Clone, PartialEq)]
@@ -50,9 +46,9 @@ pub struct Instant {
 impl Instant {
     pub fn state_suffix(&self) -> String {
         match self.state {
-            State::REQUESTED => ".requested".to_owned(),
-            State::INFLIGHT => ".inflight".to_owned(),
-            State::COMPLETED => "".to_owned(),
+            State::Requested => ".requested".to_owned(),
+            State::Inflight => ".inflight".to_owned(),
+            State::Completed => "".to_owned(),
         }
     }
 
@@ -84,7 +80,7 @@ impl Timeline {
             let p = entry?.path();
             if p.is_file() && p.extension().and_then(|e| e.to_str()) == 
Some("commit") {
                 completed_commits.push(Instant {
-                    state: State::COMPLETED,
+                    state: State::Completed,
                     timestamp: file_name_without_ext(p.file_name()),
                     action: "commit".to_owned(),
                 })
@@ -107,15 +103,16 @@ impl Timeline {
                 let commit_metadata = serde_json::from_str(&content)?;
                 Ok(commit_metadata)
             }
-            None => return Ok(HashMap::new()),
+            None => Ok(HashMap::new()),
         }
     }
 
     pub fn get_latest_schema(&self) -> Result<SchemaRef, io::Error> {
         let commit_metadata = self.get_latest_commit_metadata()?;
-        if let Some(partitionToWriteStats) = 
commit_metadata["partitionToWriteStats"].as_object() {
-            for (_key, value) in partitionToWriteStats {
-                if let Some(first_value) = value.as_array().and_then(|arr| 
arr.get(0)) {
+        if let Some(partition_to_write_stats) = 
commit_metadata["partitionToWriteStats"].as_object()
+        {
+            if let Some((_, value)) = partition_to_write_stats.iter().next() {
+                if let Some(first_value) = value.as_array().and_then(|arr| 
arr.first()) {
                     if let Some(path) = first_value["path"].as_str() {
                         let mut base_file_path = 
PathBuf::from(&self.base_path);
                         base_file_path.push(path);
@@ -124,7 +121,6 @@ impl Timeline {
                         return Ok(builder.schema().to_owned());
                     }
                 }
-                break;
             }
         }
         Err(io::Error::new(
@@ -134,32 +130,39 @@ impl Timeline {
     }
 }
 
-#[test]
-fn init_commits_timeline() {
-    let fixture_path = Path::new("fixtures/timeline/commits_stub");
-    let timeline = Timeline::init(fixture_path).unwrap();
-    assert_eq!(
-        timeline.instants,
-        vec![
-            Instant {
-                state: State::COMPLETED,
-                action: "commit".to_owned(),
-                timestamp: "20240402123035233".to_owned(),
-            },
-            Instant {
-                state: State::COMPLETED,
-                action: "commit".to_owned(),
-                timestamp: "20240402144910683".to_owned(),
-            },
-        ]
-    )
-}
+#[cfg(test)]
+mod tests {
+    use crate::timeline::{Instant, State, Timeline};
+    use hudi_fs::test_utils::extract_test_table;
+    use std::path::Path;
+
+    #[test]
+    fn read_latest_schema() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let target_table_path = extract_test_table(fixture_path);
+        let timeline = Timeline::init(target_table_path.as_path()).unwrap();
+        let table_schema = timeline.get_latest_schema().unwrap();
+        assert_eq!(table_schema.fields.len(), 11)
+    }
 
-#[test]
-fn read_latest_schema() {
-    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-    let target_table_path = extract_test_table(fixture_path);
-    let timeline = Timeline::init(target_table_path.as_path()).unwrap();
-    let table_schema = timeline.get_latest_schema().unwrap();
-    assert_eq!(table_schema.fields.len(), 11)
+    #[test]
+    fn init_commits_timeline() {
+        let fixture_path = Path::new("fixtures/timeline/commits_stub");
+        let timeline = Timeline::init(fixture_path).unwrap();
+        assert_eq!(
+            timeline.instants,
+            vec![
+                Instant {
+                    state: State::Completed,
+                    action: "commit".to_owned(),
+                    timestamp: "20240402123035233".to_owned(),
+                },
+                Instant {
+                    state: State::Completed,
+                    action: "commit".to_owned(),
+                    timestamp: "20240402144910683".to_owned(),
+                },
+            ]
+        )
+    }
 }
diff --git a/crates/datafusion/src/bin/main.rs 
b/crates/datafusion/src/bin/main.rs
index 1d96865..fc0e9da 100644
--- a/crates/datafusion/src/bin/main.rs
+++ b/crates/datafusion/src/bin/main.rs
@@ -29,7 +29,9 @@ async fn main() -> Result<()> {
     let ctx = SessionContext::new();
     let hudi = HudiDataSource::new("/tmp/trips_table");
     ctx.register_table("trips_table", Arc::new(hudi))?;
-    let df: DataFrame = ctx.sql("SELECT * from trips_table where fare > 
20.0").await?;
+    let df: DataFrame = ctx
+        .sql("SELECT * from trips_table where fare > 20.0")
+        .await?;
     df.show().await?;
     Ok(())
 }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index d25a530..ff603de 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -19,30 +19,23 @@
 
 use arrow_array::RecordBatch;
 use std::any::Any;
-use std::fmt::{Debug, Formatter};
+use std::fmt::Debug;
 use std::fs::File;
-use std::path::Path;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use arrow_schema::SchemaRef;
 use async_trait::async_trait;
-use datafusion;
 use datafusion::datasource::TableProvider;
 use datafusion::execution::context::SessionState;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
-use datafusion::prelude::{DataFrame, SessionContext};
-use datafusion_common;
 use datafusion_common::{project_schema, DataFusionError};
-use datafusion_expr;
 use datafusion_expr::{Expr, TableType};
-use datafusion_physical_expr;
 use datafusion_physical_expr::PhysicalSortExpr;
 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
 
 use hudi_core::HudiTable;
-use hudi_fs::test_utils::extract_test_table;
 
 #[derive(Debug, Clone)]
 pub struct HudiDataSource {
@@ -76,7 +69,7 @@ impl HudiDataSource {
                 }
                 Ok(record_batches)
             }
-            Err(e) => Err(DataFusionError::Execution(
+            Err(_e) => Err(DataFusionError::Execution(
                 "Failed to read records from table.".to_owned(),
             )),
         }
@@ -99,10 +92,10 @@ impl TableProvider for HudiDataSource {
 
     async fn scan(
         &self,
-        state: &SessionState,
+        _state: &SessionState,
         projection: Option<&Vec<usize>>,
-        filters: &[Expr],
-        limit: Option<usize>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
     ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
         return self.create_physical_plan(projection, self.schema()).await;
     }
@@ -157,15 +150,15 @@ impl ExecutionPlan for HudiExec {
 
     fn with_new_children(
         self: Arc<Self>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
         Ok(self)
     }
 
     fn execute(
         &self,
-        partition: usize,
-        context: Arc<TaskContext>,
+        _partition: usize,
+        _context: Arc<TaskContext>,
     ) -> datafusion_common::Result<SendableRecordBatchStream> {
         let data = self.data_source.get_record_batches()?;
         Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
diff --git a/crates/fs/src/file_systems.rs b/crates/fs/src/file_systems.rs
index 36f7ab3..7160b7d 100644
--- a/crates/fs/src/file_systems.rs
+++ b/crates/fs/src/file_systems.rs
@@ -20,11 +20,9 @@
 use std::error::Error;
 use std::{fs::File, path::Path};
 
-use parquet::file::reader::{FileReader, Length, SerializedFileReader};
+use parquet::file::reader::{FileReader, SerializedFileReader};
 
-use crate::assert_approx_eq;
-
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 pub struct FileMetadata {
     pub path: String,
     pub name: String,
@@ -46,12 +44,19 @@ impl FileMetadata {
     }
 }
 
-#[test]
-fn read_file_metadata() {
-    let fixture_path = Path::new("fixtures/a.parquet");
-    let fm = FileMetadata::from_path(fixture_path).unwrap();
-    assert_eq!(fm.path, "fixtures/a.parquet");
-    assert_eq!(fm.name, "a.parquet");
-    assert_approx_eq!(fm.size, 866, 20);
-    assert_eq!(fm.num_records, 5);
+#[cfg(test)]
+mod tests {
+    use crate::assert_approx_eq;
+    use crate::file_systems::FileMetadata;
+    use std::path::Path;
+
+    #[test]
+    fn read_file_metadata() {
+        let fixture_path = Path::new("fixtures/a.parquet");
+        let fm = FileMetadata::from_path(fixture_path).unwrap();
+        assert_eq!(fm.path, "fixtures/a.parquet");
+        assert_eq!(fm.name, "a.parquet");
+        assert_approx_eq!(fm.size, 866, 20);
+        assert_eq!(fm.num_records, 5);
+    }
 }
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
new file mode 100644
index 0000000..6d31177
--- /dev/null
+++ b/rust-toolchain.toml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[toolchain]
+channel = "1.75"
+components = ["rustfmt", "clippy"]
+profile = "minimal"

Reply via email to