devillove084 commented on code in PR #51:
URL: https://github.com/apache/paimon-rust/pull/51#discussion_r1719762195
##########
crates/paimon/src/io/file_io.rs:
##########
@@ -17,66 +17,80 @@
use crate::error::*;
use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
#[derive(Clone, Debug)]
pub struct FileIO {
- op: Operator,
+ inner_storage: Arc<Storage>,
}
impl FileIO {
- /// Create a new FileIO.
+ /// Try to infer file io scheme from path.
///
/// The input HashMap is paimon-java's
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
- ///
- /// TODO: Support building Operator from HashMap via options.
- pub fn new(_: HashMap<String, String>) -> Result<Self> {
- let op = Operator::new(Fs::default().root("/"))
- .context(IoUnexpectedSnafu {
- message: "Failed to create operator".to_string(),
- })?
- .finish();
- Ok(Self { op })
+ pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+ let url = Url::parse(path.as_ref())
+ .context(UrlParseSnafu)
+ .or_else(|_| {
+ Url::from_file_path(path.as_ref()).map_err(|_|
Error::DataInvalid {
+ message: "Input is neither a valid URL nor a valid file
path".to_string(),
+ })
+ })?;
+
+ Ok(FileIOBuilder::new(url.scheme()))
}
/// Create a new input file to read data.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
- pub fn new_input(&self, path: &str) -> InputFile {
- InputFile {
- _op: self.op.clone(),
- path: path.to_string(),
- }
+ pub fn new_input(&self, path: impl AsRef<str>) -> crate::Result<InputFile>
{
Review Comment:
No problem
##########
crates/paimon/src/io/file_io.rs:
##########
@@ -17,66 +17,80 @@
use crate::error::*;
use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
#[derive(Clone, Debug)]
pub struct FileIO {
- op: Operator,
+ inner_storage: Arc<Storage>,
}
impl FileIO {
- /// Create a new FileIO.
+ /// Try to infer file io scheme from path.
///
/// The input HashMap is paimon-java's
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
- ///
- /// TODO: Support building Operator from HashMap via options.
- pub fn new(_: HashMap<String, String>) -> Result<Self> {
- let op = Operator::new(Fs::default().root("/"))
- .context(IoUnexpectedSnafu {
- message: "Failed to create operator".to_string(),
- })?
- .finish();
- Ok(Self { op })
+ pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+ let url = Url::parse(path.as_ref())
+ .context(UrlParseSnafu)
+ .or_else(|_| {
+ Url::from_file_path(path.as_ref()).map_err(|_|
Error::DataInvalid {
+ message: "Input is neither a valid URL nor a valid file
path".to_string(),
+ })
+ })?;
+
+ Ok(FileIOBuilder::new(url.scheme()))
}
/// Create a new input file to read data.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
- pub fn new_input(&self, path: &str) -> InputFile {
- InputFile {
- _op: self.op.clone(),
- path: path.to_string(),
- }
+ pub fn new_input(&self, path: impl AsRef<str>) -> crate::Result<InputFile>
{
+ let (op, relative_path) = self.inner_storage.create_operator(&path)?;
+ let path = path.as_ref().to_string();
+ let relative_path_pos = path.len() - relative_path.len();
+ Ok(InputFile {
+ op,
+ path,
+ relative_path_pos,
+ })
}
/// Create a new output file to write data.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
- pub fn new_output(&self, path: &str) -> OutputFile {
- OutputFile {
- _op: self.op.clone(),
- path: path.to_string(),
- }
+ pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
+ let (op, relative_path) = self.inner_storage.create_operator(&path)?;
+ let path = path.as_ref().to_string();
+ let relative_path_pos = path.len() - relative_path.len();
+ Ok(OutputFile {
+ op,
+ path,
+ relative_path_pos,
+ })
}
/// Return a file status object that represents the path.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
- pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
- let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
- message: "Failed to get file status".to_string(),
+ pub async fn get_status(&self, path: String) -> Result<FileStatus> {
Review Comment:
I will check the changes carefully later to ensure consistency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]