This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 586d210 style: enforce rust code style (#14)
586d210 is described below
commit 586d210f7593dd97f0b3305b5fb0bdbd08799123
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun May 5 17:33:54 2024 -0500
style: enforce rust code style (#14)
Use `rustfmt` and `clippy` to enforce code style through CI check job.
---
.github/workflows/ci.yml | 8 +--
Cargo.toml | 6 +-
Makefile | 31 +++++++++++
crates/core/src/error.rs | 11 ----
crates/core/src/file_group/mod.rs | 66 ++++++++++++----------
crates/core/src/table/file_system_view.rs | 64 ++++++++++++----------
crates/core/src/table/meta_client.rs | 83 +++++++++++++++-------------
crates/core/src/table/mod.rs | 25 +++++----
crates/core/src/timeline/mod.rs | 91 ++++++++++++++++---------------
crates/datafusion/src/bin/main.rs | 4 +-
crates/datafusion/src/lib.rs | 25 +++------
crates/fs/src/file_systems.rs | 29 ++++++----
rust-toolchain.toml | 21 +++++++
13 files changed, 267 insertions(+), 197 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 948a05b..4bc8235 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -34,9 +34,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
-
- - name: Check License Header
+ - name: Check license header
uses: apache/skywalking-eyes/[email protected]
+ - name: Check code style
+ run: make check
build:
runs-on: ${{ matrix.os }}
@@ -55,6 +56,5 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
-
- - name: Unit Test
+ - name: Unit test
run: cargo test --no-fail-fast --all-targets --all-features --workspace
diff --git a/Cargo.toml b/Cargo.toml
index a24ee76..8c9a163 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,7 +22,7 @@ members = [
resolver = "2"
[workspace.package]
-version = "0.2.0"
+version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
rust-version = "1.75.0"
@@ -31,7 +31,7 @@ rust-version = "1.75.0"
# arrow
arrow = { version = "50" }
arrow-arith = { version = "50" }
-arrow-array = { version = "50", features = ["chrono-tz"]}
+arrow-array = { version = "50", features = ["chrono-tz"] }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
@@ -68,4 +68,4 @@ uuid = { version = "1" }
async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
-num_cpus = { version = "1" }
\ No newline at end of file
+num_cpus = { version = "1" }
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..3a5ba3f
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+.EXPORT_ALL_VARIABLES:
+
+RUST_LOG = debug
+
+build:
+ cargo build
+
+check-fmt:
+ cargo fmt --all -- --check
+
+check-clippy:
+ cargo clippy --all-targets --all-features --workspace -- -D warnings
+
+check: check-fmt check-clippy
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index 9b6e3c0..f7f0bd7 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -19,24 +19,15 @@
use std::error::Error;
use std::fmt::Debug;
-use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum HudiFileGroupError {
- #[error("Base File {0} has unsupported format: {1}")]
- UnsupportedBaseFileFormat(String, String),
#[error("Commit time {0} is already present in File Group {1}")]
CommitTimeAlreadyExists(String, String),
}
-#[derive(Debug, Error)]
-pub enum HudiTimelineError {
- #[error("Error in reading commit metadata: {0}")]
- FailToReadCommitMetadata(io::Error),
-}
-
#[derive(Debug, Error)]
pub enum HudiFileSystemViewError {
#[error("Error in loading partitions: {0}")]
@@ -47,8 +38,6 @@ pub enum HudiFileSystemViewError {
pub enum HudiCoreError {
#[error("Failed to load file group")]
FailToLoadFileGroup(#[from] HudiFileGroupError),
- #[error("Failed to init timeline")]
- FailToInitTimeline(#[from] HudiTimelineError),
#[error("Failed to build file system view")]
FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 3472d93..609b6e6 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -26,7 +26,7 @@ use hudi_fs::file_systems::FileMetadata;
use crate::error::HudiFileGroupError;
use crate::error::HudiFileGroupError::CommitTimeAlreadyExists;
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct BaseFile {
pub file_group_id: String,
pub commit_time: String,
@@ -53,12 +53,13 @@ impl BaseFile {
}
}
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct FileSlice {
pub base_file: BaseFile,
pub partition_path: Option<String>,
}
+#[allow(dead_code)]
impl FileSlice {
pub fn file_path(&self) -> Option<&str> {
match &self.base_file.metadata {
@@ -99,6 +100,7 @@ impl fmt::Display for FileGroup {
}
}
+#[allow(dead_code)]
impl FileGroup {
pub fn new(id: String, partition_path: Option<String>) -> Self {
Self {
@@ -140,32 +142,38 @@ impl FileGroup {
}
}
-#[test]
-fn create_a_base_file_successfully() {
- let base_file =
-
BaseFile::new("5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet");
- assert_eq!(
- base_file.file_group_id,
- "5a226868-2934-4f84-a16f-55124630c68d-0"
- );
- assert_eq!(base_file.commit_time, "20240402144910683");
-}
+#[cfg(test)]
+mod tests {
+ use crate::file_group::{BaseFile, FileGroup};
+
+ #[test]
+ fn create_a_base_file_successfully() {
+ let base_file = BaseFile::new(
+
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+ );
+ assert_eq!(
+ base_file.file_group_id,
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
+ );
+ assert_eq!(base_file.commit_time, "20240402144910683");
+ }
-#[test]
-fn load_a_valid_file_group() {
- let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
- let _ = fg.add_base_file_from_name(
-
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
- );
- let _ = fg.add_base_file_from_name(
-
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
- );
- assert_eq!(fg.file_slices.len(), 2);
- assert!(fg.partition_path.is_none());
- let commit_times: Vec<&str> = fg.file_slices.keys().map(|k|
k.as_str()).collect();
- assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]);
- assert_eq!(
- fg.get_latest_file_slice().unwrap().base_file.commit_time,
- "20240402144910683"
- )
+ #[test]
+ fn load_a_valid_file_group() {
+ let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+ let _ = fg.add_base_file_from_name(
+
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+ );
+ let _ = fg.add_base_file_from_name(
+
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
+ );
+ assert_eq!(fg.file_slices.len(), 2);
+ assert!(fg.partition_path.is_none());
+ let commit_times: Vec<&str> = fg.file_slices.keys().map(|k|
k.as_str()).collect();
+ assert_eq!(commit_times, vec!["20240402123035233",
"20240402144910683"]);
+ assert_eq!(
+ fg.get_latest_file_slice().unwrap().base_file.commit_time,
+ "20240402144910683"
+ )
+ }
}
diff --git a/crates/core/src/table/file_system_view.rs
b/crates/core/src/table/file_system_view.rs
index e084ee8..7714a8c 100644
--- a/crates/core/src/table/file_system_view.rs
+++ b/crates/core/src/table/file_system_view.rs
@@ -22,9 +22,6 @@ use
crate::error::HudiFileSystemViewError::FailToLoadPartitions;
use crate::file_group::{FileGroup, FileSlice};
use crate::table::meta_client::MetaClient;
use hashbrown::HashMap;
-use hudi_fs::test_utils::extract_test_table;
-use std::collections::HashSet;
-use std::path::Path;
pub struct FileSystemView {
meta_client: MetaClient,
@@ -62,37 +59,46 @@ impl FileSystemView {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.values() {
for fg in fgs {
- match fg.get_latest_file_slice() {
- Some(file_slice) => file_slices.push(file_slice.clone()),
- None => (),
+ if let Some(file_slice) = fg.get_latest_file_slice() {
+ file_slices.push(file_slice)
}
}
}
file_slices
}
}
-#[test]
-fn meta_client_get_file_groups() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- let fs_view = FileSystemView::init(meta_client).unwrap();
- let file_slices = fs_view.get_latest_file_slices();
- assert_eq!(file_slices.len(), 5);
- let mut fg_ids = Vec::new();
- for f in file_slices {
- let fp = f.file_group_id();
- fg_ids.push(fp);
+
+#[cfg(test)]
+mod tests {
+ use crate::table::file_system_view::FileSystemView;
+ use crate::table::meta_client::MetaClient;
+ use hudi_fs::test_utils::extract_test_table;
+ use std::collections::HashSet;
+ use std::path::Path;
+
+ #[test]
+ fn meta_client_get_file_groups() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ let fs_view = FileSystemView::init(meta_client).unwrap();
+ let file_slices = fs_view.get_latest_file_slices();
+ assert_eq!(file_slices.len(), 5);
+ let mut fg_ids = Vec::new();
+ for f in file_slices {
+ let fp = f.file_group_id();
+ fg_ids.push(fp);
+ }
+ let actual: HashSet<&str> = fg_ids.into_iter().collect();
+ assert_eq!(
+ actual,
+ HashSet::from_iter(vec![
+ "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+ "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
+ "ee915c68-d7f8-44f6-9759-e691add290d8-0",
+ "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
+ ])
+ );
}
- let actual: HashSet<&str> = fg_ids.into_iter().collect();
- assert_eq!(
- actual,
- HashSet::from_iter(vec![
- "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
- "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
- "ee915c68-d7f8-44f6-9759-e691add290d8-0",
- "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
- "5a226868-2934-4f84-a16f-55124630c68d-0"
- ])
- );
}
diff --git a/crates/core/src/table/meta_client.rs
b/crates/core/src/table/meta_client.rs
index 27f0cf9..15f8495 100644
--- a/crates/core/src/table/meta_client.rs
+++ b/crates/core/src/table/meta_client.rs
@@ -17,7 +17,6 @@
* under the License.
*/
-use std::collections::HashSet;
use std::error::Error;
use std::path::{Path, PathBuf};
use std::{fs, io};
@@ -25,7 +24,6 @@ use std::{fs, io};
use hashbrown::HashMap;
use hudi_fs::file_systems::FileMetadata;
-use hudi_fs::test_utils::extract_test_table;
use crate::file_group::{BaseFile, FileGroup};
use crate::timeline::Timeline;
@@ -98,7 +96,7 @@ impl MetaClient {
let fg_id = &base_file.file_group_id;
fg_id_to_base_files
.entry(fg_id.to_owned())
- .or_insert_with(Vec::new)
+ .or_default()
.push(base_file);
}
}
@@ -114,40 +112,49 @@ impl MetaClient {
}
}
-#[test]
-fn meta_client_get_partition_paths() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- let partition_paths = meta_client.get_partition_paths().unwrap();
- let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
- assert_eq!(
- partition_path_set,
- HashSet::from_iter(vec!["chennai", "sao_paulo", "san_francisco"])
- )
-}
+#[cfg(test)]
+mod tests {
+ use crate::table::meta_client::MetaClient;
+ use hudi_fs::test_utils::extract_test_table;
+ use std::collections::HashSet;
+ use std::path::Path;
-#[test]
-fn meta_client_get_file_groups() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- let file_groups = meta_client.get_file_groups("san_francisco").unwrap();
- assert_eq!(file_groups.len(), 3);
- let fg_ids: HashSet<&str> = HashSet::from_iter(file_groups.iter().map(|fg|
fg.id.as_str()));
- assert_eq!(
- fg_ids,
- HashSet::from_iter(vec![
- "5a226868-2934-4f84-a16f-55124630c68d-0",
- "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
- "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
- ])
- );
-}
-#[test]
-fn meta_client_active_timeline_init_as_expected() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let meta_client = MetaClient::new(&target_table_path);
- assert_eq!(meta_client.timeline.instants.len(), 2)
+ #[test]
+ fn meta_client_get_partition_paths() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ let partition_paths = meta_client.get_partition_paths().unwrap();
+ let partition_path_set: HashSet<&str> =
+ HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
+ assert_eq!(
+ partition_path_set,
+ HashSet::from_iter(vec!["chennai", "sao_paulo", "san_francisco"])
+ )
+ }
+
+ #[test]
+ fn meta_client_get_file_groups() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ let file_groups =
meta_client.get_file_groups("san_francisco").unwrap();
+ assert_eq!(file_groups.len(), 3);
+ let fg_ids: HashSet<&str> =
HashSet::from_iter(file_groups.iter().map(|fg| fg.id.as_str()));
+ assert_eq!(
+ fg_ids,
+ HashSet::from_iter(vec![
+ "5a226868-2934-4f84-a16f-55124630c68d-0",
+ "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+ "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
+ ])
+ );
+ }
+ #[test]
+ fn meta_client_active_timeline_init_as_expected() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ assert_eq!(meta_client.timeline.instants.len(), 2)
+ }
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0bc65dc..d1c7dc7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -18,12 +18,10 @@
*/
use std::error::Error;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use arrow_schema::SchemaRef;
-use hudi_fs::test_utils::extract_test_table;
-
use crate::table::file_system_view::FileSystemView;
use crate::table::meta_client::MetaClient;
@@ -68,11 +66,18 @@ impl Table {
}
}
-#[test]
-fn load_snapshot_file_paths() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let hudi_table = Table::new(target_table_path.as_path().to_str().unwrap());
- assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
- println!("{}", hudi_table.schema().to_string());
+#[cfg(test)]
+mod tests {
+ use crate::table::Table;
+ use hudi_fs::test_utils::extract_test_table;
+ use std::path::Path;
+
+ #[test]
+ fn load_snapshot_file_paths() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let hudi_table =
Table::new(target_table_path.as_path().to_str().unwrap());
+ assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
+ println!("{}", hudi_table.schema());
+ }
}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index c0c4bd9..75d89d5 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -21,23 +21,19 @@ use hudi_fs::file_name_without_ext;
use std::collections::HashMap;
use arrow_schema::SchemaRef;
-use hudi_fs::test_utils::extract_test_table;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
-use parquet::file::serialized_reader::SerializedFileReader;
-use serde::de::Error;
use serde_json::Value;
use std::fs::File;
use std::io::{ErrorKind, Read};
use std::path::{Path, PathBuf};
use std::{fs, io};
-use crate::error::HudiTimelineError;
-
+#[allow(dead_code)]
#[derive(Debug, Clone, PartialEq)]
pub enum State {
- REQUESTED,
- INFLIGHT,
- COMPLETED,
+ Requested,
+ Inflight,
+ Completed,
}
#[derive(Debug, Clone, PartialEq)]
@@ -50,9 +46,9 @@ pub struct Instant {
impl Instant {
pub fn state_suffix(&self) -> String {
match self.state {
- State::REQUESTED => ".requested".to_owned(),
- State::INFLIGHT => ".inflight".to_owned(),
- State::COMPLETED => "".to_owned(),
+ State::Requested => ".requested".to_owned(),
+ State::Inflight => ".inflight".to_owned(),
+ State::Completed => "".to_owned(),
}
}
@@ -84,7 +80,7 @@ impl Timeline {
let p = entry?.path();
if p.is_file() && p.extension().and_then(|e| e.to_str()) ==
Some("commit") {
completed_commits.push(Instant {
- state: State::COMPLETED,
+ state: State::Completed,
timestamp: file_name_without_ext(p.file_name()),
action: "commit".to_owned(),
})
@@ -107,15 +103,16 @@ impl Timeline {
let commit_metadata = serde_json::from_str(&content)?;
Ok(commit_metadata)
}
- None => return Ok(HashMap::new()),
+ None => Ok(HashMap::new()),
}
}
pub fn get_latest_schema(&self) -> Result<SchemaRef, io::Error> {
let commit_metadata = self.get_latest_commit_metadata()?;
- if let Some(partitionToWriteStats) =
commit_metadata["partitionToWriteStats"].as_object() {
- for (_key, value) in partitionToWriteStats {
- if let Some(first_value) = value.as_array().and_then(|arr|
arr.get(0)) {
+ if let Some(partition_to_write_stats) =
commit_metadata["partitionToWriteStats"].as_object()
+ {
+ if let Some((_, value)) = partition_to_write_stats.iter().next() {
+ if let Some(first_value) = value.as_array().and_then(|arr|
arr.first()) {
if let Some(path) = first_value["path"].as_str() {
let mut base_file_path =
PathBuf::from(&self.base_path);
base_file_path.push(path);
@@ -124,7 +121,6 @@ impl Timeline {
return Ok(builder.schema().to_owned());
}
}
- break;
}
}
Err(io::Error::new(
@@ -134,32 +130,39 @@ impl Timeline {
}
}
-#[test]
-fn init_commits_timeline() {
- let fixture_path = Path::new("fixtures/timeline/commits_stub");
- let timeline = Timeline::init(fixture_path).unwrap();
- assert_eq!(
- timeline.instants,
- vec![
- Instant {
- state: State::COMPLETED,
- action: "commit".to_owned(),
- timestamp: "20240402123035233".to_owned(),
- },
- Instant {
- state: State::COMPLETED,
- action: "commit".to_owned(),
- timestamp: "20240402144910683".to_owned(),
- },
- ]
- )
-}
+#[cfg(test)]
+mod tests {
+ use crate::timeline::{Instant, State, Timeline};
+ use hudi_fs::test_utils::extract_test_table;
+ use std::path::Path;
+
+ #[test]
+ fn read_latest_schema() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let timeline = Timeline::init(target_table_path.as_path()).unwrap();
+ let table_schema = timeline.get_latest_schema().unwrap();
+ assert_eq!(table_schema.fields.len(), 11)
+ }
-#[test]
-fn read_latest_schema() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let timeline = Timeline::init(target_table_path.as_path()).unwrap();
- let table_schema = timeline.get_latest_schema().unwrap();
- assert_eq!(table_schema.fields.len(), 11)
+ #[test]
+ fn init_commits_timeline() {
+ let fixture_path = Path::new("fixtures/timeline/commits_stub");
+ let timeline = Timeline::init(fixture_path).unwrap();
+ assert_eq!(
+ timeline.instants,
+ vec![
+ Instant {
+ state: State::Completed,
+ action: "commit".to_owned(),
+ timestamp: "20240402123035233".to_owned(),
+ },
+ Instant {
+ state: State::Completed,
+ action: "commit".to_owned(),
+ timestamp: "20240402144910683".to_owned(),
+ },
+ ]
+ )
+ }
}
diff --git a/crates/datafusion/src/bin/main.rs
b/crates/datafusion/src/bin/main.rs
index 1d96865..fc0e9da 100644
--- a/crates/datafusion/src/bin/main.rs
+++ b/crates/datafusion/src/bin/main.rs
@@ -29,7 +29,9 @@ async fn main() -> Result<()> {
let ctx = SessionContext::new();
let hudi = HudiDataSource::new("/tmp/trips_table");
ctx.register_table("trips_table", Arc::new(hudi))?;
- let df: DataFrame = ctx.sql("SELECT * from trips_table where fare >
20.0").await?;
+ let df: DataFrame = ctx
+ .sql("SELECT * from trips_table where fare > 20.0")
+ .await?;
df.show().await?;
Ok(())
}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index d25a530..ff603de 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -19,30 +19,23 @@
use arrow_array::RecordBatch;
use std::any::Any;
-use std::fmt::{Debug, Formatter};
+use std::fmt::Debug;
use std::fs::File;
-use std::path::Path;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
-use datafusion;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
-use datafusion::prelude::{DataFrame, SessionContext};
-use datafusion_common;
use datafusion_common::{project_schema, DataFusionError};
-use datafusion_expr;
use datafusion_expr::{Expr, TableType};
-use datafusion_physical_expr;
use datafusion_physical_expr::PhysicalSortExpr;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use hudi_core::HudiTable;
-use hudi_fs::test_utils::extract_test_table;
#[derive(Debug, Clone)]
pub struct HudiDataSource {
@@ -76,7 +69,7 @@ impl HudiDataSource {
}
Ok(record_batches)
}
- Err(e) => Err(DataFusionError::Execution(
+ Err(_e) => Err(DataFusionError::Execution(
"Failed to read records from table.".to_owned(),
)),
}
@@ -99,10 +92,10 @@ impl TableProvider for HudiDataSource {
async fn scan(
&self,
- state: &SessionState,
+ _state: &SessionState,
projection: Option<&Vec<usize>>,
- filters: &[Expr],
- limit: Option<usize>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
return self.create_physical_plan(projection, self.schema()).await;
}
@@ -157,15 +150,15 @@ impl ExecutionPlan for HudiExec {
fn with_new_children(
self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
- partition: usize,
- context: Arc<TaskContext>,
+ _partition: usize,
+ _context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let data = self.data_source.get_record_batches()?;
Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
diff --git a/crates/fs/src/file_systems.rs b/crates/fs/src/file_systems.rs
index 36f7ab3..7160b7d 100644
--- a/crates/fs/src/file_systems.rs
+++ b/crates/fs/src/file_systems.rs
@@ -20,11 +20,9 @@
use std::error::Error;
use std::{fs::File, path::Path};
-use parquet::file::reader::{FileReader, Length, SerializedFileReader};
+use parquet::file::reader::{FileReader, SerializedFileReader};
-use crate::assert_approx_eq;
-
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct FileMetadata {
pub path: String,
pub name: String,
@@ -46,12 +44,19 @@ impl FileMetadata {
}
}
-#[test]
-fn read_file_metadata() {
- let fixture_path = Path::new("fixtures/a.parquet");
- let fm = FileMetadata::from_path(fixture_path).unwrap();
- assert_eq!(fm.path, "fixtures/a.parquet");
- assert_eq!(fm.name, "a.parquet");
- assert_approx_eq!(fm.size, 866, 20);
- assert_eq!(fm.num_records, 5);
+#[cfg(test)]
+mod tests {
+ use crate::assert_approx_eq;
+ use crate::file_systems::FileMetadata;
+ use std::path::Path;
+
+ #[test]
+ fn read_file_metadata() {
+ let fixture_path = Path::new("fixtures/a.parquet");
+ let fm = FileMetadata::from_path(fixture_path).unwrap();
+ assert_eq!(fm.path, "fixtures/a.parquet");
+ assert_eq!(fm.name, "a.parquet");
+ assert_approx_eq!(fm.size, 866, 20);
+ assert_eq!(fm.num_records, 5);
+ }
}
diff --git a/rust-toolchain.toml b/rust-toolchain.toml
new file mode 100644
index 0000000..6d31177
--- /dev/null
+++ b/rust-toolchain.toml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[toolchain]
+channel = "1.75"
+components = ["rustfmt", "clippy"]
+profile = "minimal"