This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3b8121e feat: Extract FileRead and FileWrite trait (#364)
3b8121e is described below
commit 3b8121eaa9e9628536093836dcc41119716afd9e
Author: Xuanwo <[email protected]>
AuthorDate: Tue May 14 22:31:42 2024 +0800
feat: Extract FileRead and FileWrite trait (#364)
* feat: Extract FileRead and FileWrie trait
Signed-off-by: Xuanwo <[email protected]>
* Enable s3 services for tests
Signed-off-by: Xuanwo <[email protected]>
* Fix sort
Signed-off-by: Xuanwo <[email protected]>
* Add comment for io trait
Signed-off-by: Xuanwo <[email protected]>
* Fix test for rest
Signed-off-by: Xuanwo <[email protected]>
* Use try join
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
Cargo.toml | 12 +--
crates/catalog/glue/Cargo.toml | 1 +
crates/catalog/glue/src/catalog.rs | 15 +--
crates/catalog/hms/Cargo.toml | 1 +
crates/catalog/hms/src/catalog.rs | 15 +--
crates/catalog/rest/Cargo.toml | 1 +
crates/iceberg/src/arrow/reader.rs | 59 ++++++++--
crates/iceberg/src/io.rs | 119 +++++++++++++++++----
crates/iceberg/src/spec/manifest.rs | 10 +-
crates/iceberg/src/spec/manifest_list.rs | 12 +--
crates/iceberg/src/spec/snapshot.rs | 9 +-
crates/iceberg/src/table.rs | 20 ++--
.../src/writer/file_writer/parquet_writer.rs | 110 ++++++++++++++++++-
.../iceberg/src/writer/file_writer/track_writer.rs | 47 +++-----
crates/iceberg/src/writer/mod.rs | 17 +--
crates/iceberg/tests/file_io_s3_test.rs | 15 +--
crates/integrations/datafusion/Cargo.toml | 1 +
17 files changed, 315 insertions(+), 149 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index d1894c1..57c3436 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,11 +18,11 @@
[workspace]
resolver = "2"
members = [
- "crates/catalog/*",
- "crates/examples",
- "crates/iceberg",
- "crates/integrations/*",
- "crates/test_utils",
+ "crates/catalog/*",
+ "crates/examples",
+ "crates/iceberg",
+ "crates/integrations/*",
+ "crates/test_utils",
]
[workspace.package]
@@ -64,7 +64,7 @@ log = "^0.4"
mockito = "^1"
murmur3 = "0.5.2"
once_cell = "1"
-opendal = "0.45"
+opendal = "0.46"
ordered-float = "4.0.0"
parquet = "51"
pilota = "0.11.0"
diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml
index 0508378..8e1c077 100644
--- a/crates/catalog/glue/Cargo.toml
+++ b/crates/catalog/glue/Cargo.toml
@@ -42,4 +42,5 @@ uuid = { workspace = true }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }
diff --git a/crates/catalog/glue/src/catalog.rs
b/crates/catalog/glue/src/catalog.rs
index f402129..147d86a 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -25,7 +25,6 @@ use iceberg::{
TableIdent,
};
use std::{collections::HashMap, fmt::Debug};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
use typed_builder::TypedBuilder;
@@ -358,13 +357,10 @@ impl Catalog for GlueCatalog {
let metadata =
TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;
- let mut file = self
- .file_io
+ self.file_io
.new_output(&metadata_location)?
- .writer()
+ .write(serde_json::to_vec(&metadata)?.into())
.await?;
- file.write_all(&serde_json::to_vec(&metadata)?).await?;
- file.shutdown().await?;
let glue_table = convert_to_glue_table(
&table_name,
@@ -431,10 +427,9 @@ impl Catalog for GlueCatalog {
Some(table) => {
let metadata_location =
get_metadata_location(&table.parameters)?;
- let mut reader =
self.file_io.new_input(&metadata_location)?.reader().await?;
- let mut metadata_str = String::new();
- reader.read_to_string(&mut metadata_str).await?;
- let metadata =
serde_json::from_str::<TableMetadata>(&metadata_str)?;
+ let input_file = self.file_io.new_input(&metadata_location)?;
+ let metadata_content = input_file.read().await?;
+ let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let table = Table::builder()
.file_io(self.file_io())
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index 5a03221..b539015 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -44,4 +44,5 @@ volo-thrift = { workspace = true }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index 2f545dd..18fcacd 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -35,8 +35,6 @@ use iceberg::{
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWriteExt;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;
@@ -349,13 +347,10 @@ impl Catalog for HmsCatalog {
let metadata =
TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;
- let mut file = self
- .file_io
+ self.file_io
.new_output(&metadata_location)?
- .writer()
+ .write(serde_json::to_vec(&metadata)?.into())
.await?;
- file.write_all(&serde_json::to_vec(&metadata)?).await?;
- file.shutdown().await?;
let hive_table = convert_to_hive_table(
db_name.clone(),
@@ -406,10 +401,8 @@ impl Catalog for HmsCatalog {
let metadata_location = get_metadata_location(&hive_table.parameters)?;
- let mut reader =
self.file_io.new_input(&metadata_location)?.reader().await?;
- let mut metadata_str = String::new();
- reader.read_to_string(&mut metadata_str).await?;
- let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
+ let metadata_content =
self.file_io.new_input(&metadata_location)?.read().await?;
+ let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let table = Table::builder()
.file_io(self.file_io())
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 7abe9c8..43e5899 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -46,5 +46,6 @@ uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
+opendal = { workspace = true, features = ["services-fs"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index e3f30f8..fe5efac 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -19,14 +19,21 @@
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
+use bytes::Bytes;
+use futures::future::BoxFuture;
use futures::stream::StreamExt;
+use futures::{try_join, TryFutureExt};
+use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask,
PARQUET_FIELD_ID_META_KEY};
+use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::SchemaDescriptor;
use std::collections::HashMap;
+use std::ops::Range;
use std::str::FromStr;
+use std::sync::Arc;
use crate::arrow::arrow_schema_to_schema;
-use crate::io::FileIO;
+use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::SchemaRef;
use crate::{Error, ErrorKind};
@@ -91,12 +98,12 @@ impl ArrowReader {
Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
- let parquet_reader = file_io
- .new_input(task.data().data_file().file_path())?
- .reader()
- .await?;
+ let parquet_file = file_io
+ .new_input(task.data().data_file().file_path())?;
+ let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
+ let arrow_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader);
- let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(parquet_reader)
+ let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;
let parquet_schema = batch_stream_builder.parquet_schema();
@@ -187,3 +194,43 @@ impl ArrowReader {
}
}
}
+
+/// ArrowFileReader is a wrapper around a FileRead that impls parquets
AsyncFileReader.
+///
+/// # TODO
+///
+///
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
contains the following hints to speed up metadata loading, we can consider
adding them to this struct:
+///
+/// - `metadata_size_hint`: Provide a hint as to the size of the parquet
file's footer.
+/// - `preload_column_index`: Load the Column Index as part of
[`Self::get_metadata`].
+/// - `preload_offset_index`: Load the Offset Index as part of
[`Self::get_metadata`].
+struct ArrowFileReader<R: FileRead> {
+ meta: FileMetadata,
+ r: R,
+}
+
+impl<R: FileRead> ArrowFileReader<R> {
+ /// Create a new ArrowFileReader
+ fn new(meta: FileMetadata, r: R) -> Self {
+ Self { meta, r }
+ }
+}
+
+impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
+ fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ Box::pin(
+ self.r
+ .read(range.start as _..range.end as _)
+ .map_err(|err|
parquet::errors::ParquetError::External(Box::new(err))),
+ )
+ }
+
+ fn get_metadata(&mut self) -> BoxFuture<'_,
parquet::errors::Result<Arc<ParquetMetaData>>> {
+ Box::pin(async move {
+ let file_size = self.meta.size;
+ let mut loader = MetadataLoader::load(self, file_size as usize,
None).await?;
+ loader.load_page_index(false, false).await?;
+ Ok(Arc::new(loader.finish()))
+ })
+ }
+}
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index d3f07cb..c045b22 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -48,14 +48,13 @@
//! - `new_input`: Create input file for reading.
//! - `new_output`: Create output file for writing.
+use bytes::Bytes;
+use std::ops::Range;
use std::{collections::HashMap, sync::Arc};
use crate::{error::Result, Error, ErrorKind};
-use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
-use tokio::io::AsyncWrite as TokioAsyncWrite;
-use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;
/// Following are arguments for [s3 file
io](https://py.iceberg.apache.org/configuration/#s3).
@@ -206,6 +205,35 @@ impl FileIO {
}
}
+/// The struct the represents the metadata of a file.
+///
+/// TODO: we can add last modified time, content type, etc. in the future.
+pub struct FileMetadata {
+ /// The size of the file.
+ pub size: u64,
+}
+
+/// Trait for reading file.
+///
+/// # TODO
+///
+/// It's possible for us to remove the async_trait, but we need to figure
+/// out how to handle the object safety.
+#[async_trait::async_trait]
+pub trait FileRead: Send + Unpin + 'static {
+ /// Read file content with given range.
+ ///
+ /// TODO: we can support reading non-contiguous bytes in the future.
+ async fn read(&self, range: Range<u64>) -> Result<Bytes>;
+}
+
+#[async_trait::async_trait]
+impl FileRead for opendal::Reader {
+ async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+ Ok(opendal::Reader::read(self, range).await?.to_bytes())
+ }
+}
+
/// Input file is used for reading from files.
#[derive(Debug)]
pub struct InputFile {
@@ -216,14 +244,6 @@ pub struct InputFile {
relative_path_pos: usize,
}
-/// Trait for reading file.
-pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead +
TokioAsyncSeek {}
-
-impl<T> FileRead for T where
- T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
-{
-}
-
impl InputFile {
/// Absolute path to root uri.
pub fn location(&self) -> &str {
@@ -238,16 +258,63 @@ impl InputFile {
.await?)
}
- /// Creates [`InputStream`] for reading.
+ /// Fetch and returns metadata of file.
+ pub async fn metadata(&self) -> Result<FileMetadata> {
+ let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
+
+ Ok(FileMetadata {
+ size: meta.content_length(),
+ })
+ }
+
+ /// Read and returns whole content of file.
+ ///
+ /// For continues reading, use [`Self::reader`] instead.
+ pub async fn read(&self) -> Result<Bytes> {
+ Ok(self
+ .op
+ .read(&self.path[self.relative_path_pos..])
+ .await?
+ .to_bytes())
+ }
+
+ /// Creates [`FileRead`] for continues reading.
+ ///
+ /// For one-time reading, use [`Self::read`] instead.
pub async fn reader(&self) -> Result<impl FileRead> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
}
}
/// Trait for writing file.
-pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
+///
+/// # TODO
+///
+/// It's possible for us to remove the async_trait, but we need to figure
+/// out how to handle the object safety.
+#[async_trait::async_trait]
+pub trait FileWrite: Send + Unpin + 'static {
+ /// Write bytes to file.
+ ///
+ /// TODO: we can support writing non-contiguous bytes in the future.
+ async fn write(&mut self, bs: Bytes) -> Result<()>;
-impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
+ /// Close file.
+ ///
+ /// Calling close on closed file will generate an error.
+ async fn close(&mut self) -> Result<()>;
+}
+
+#[async_trait::async_trait]
+impl FileWrite for opendal::Writer {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ Ok(opendal::Writer::write(self, bs).await?)
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(opendal::Writer::close(self).await?)
+ }
+}
/// Output file is used for writing to files..
#[derive(Debug)]
@@ -282,7 +349,23 @@ impl OutputFile {
}
}
- /// Creates output file for writing.
+ /// Create a new output file with given bytes.
+ ///
+ /// # Notes
+ ///
+ /// Calling `write` will overwrite the file if it exists.
+ /// For continues writing, use [`Self::writer`].
+ pub async fn write(&self, bs: Bytes) -> Result<()> {
+ let mut writer = self.writer().await?;
+ writer.write(bs).await?;
+ writer.close().await
+ }
+
+ /// Creates output file for continues writing.
+ ///
+ /// # Notes
+ ///
+ /// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
@@ -398,7 +481,7 @@ mod tests {
use std::{fs::File, path::Path};
use futures::io::AllowStdIo;
- use futures::{AsyncReadExt, AsyncWriteExt};
+ use futures::AsyncReadExt;
use tempfile::TempDir;
@@ -483,9 +566,7 @@ mod tests {
assert!(!output_file.exists().await.unwrap());
{
- let mut writer = output_file.writer().await.unwrap();
- writer.write_all(content.as_bytes()).await.unwrap();
- writer.close().await.unwrap();
+ output_file.write(content.into()).await.unwrap();
}
assert_eq!(&full_path, output_file.location());
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index 3daa5c2..b1eb216 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -28,7 +28,7 @@ use crate::io::OutputFile;
use crate::spec::PartitionField;
use crate::{Error, ErrorKind};
use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as
AvroWriter};
-use futures::AsyncWriteExt;
+use bytes::Bytes;
use serde_json::to_vec;
use std::cmp::min;
use std::collections::HashMap;
@@ -291,13 +291,7 @@ impl ManifestWriter {
let length = avro_writer.flush()?;
let content = avro_writer.into_inner()?;
- let mut writer = self.output.writer().await?;
- writer.write_all(&content).await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "Fail to write Manifest
Entry").with_source(err)
- })?;
- writer.close().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "Fail to write Manifest
Entry").with_source(err)
- })?;
+ self.output.write(Bytes::from(content)).await?;
let partition_summary =
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
diff --git a/crates/iceberg/src/spec/manifest_list.rs
b/crates/iceberg/src/spec/manifest_list.rs
index c390bee..26a4acc 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -22,7 +22,7 @@ use std::{collections::HashMap, str::FromStr};
use crate::io::FileIO;
use crate::{io::OutputFile, spec::Literal, Error, ErrorKind};
use apache_avro::{from_value, types::Value, Reader, Writer};
-use futures::{AsyncReadExt, AsyncWriteExt};
+use bytes::Bytes;
use self::{
_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1,
MANIFEST_LIST_AVRO_SCHEMA_V2},
@@ -212,7 +212,7 @@ impl ManifestListWriter {
pub async fn close(self) -> Result<()> {
let data = self.avro_writer.into_inner()?;
let mut writer = self.output_file.writer().await?;
- writer.write_all(&data).await?;
+ writer.write(Bytes::from(data)).await?;
writer.close().await?;
Ok(())
}
@@ -632,13 +632,7 @@ impl ManifestFile {
///
/// This method will also initialize inherited values of
[`ManifestEntry`], such as `sequence_number`.
pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
- let mut avro = Vec::new();
- file_io
- .new_input(&self.manifest_path)?
- .reader()
- .await?
- .read_to_end(&mut avro)
- .await?;
+ let avro = file_io.new_input(&self.manifest_path)?.read().await?;
let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;
diff --git a/crates/iceberg/src/spec/snapshot.rs
b/crates/iceberg/src/spec/snapshot.rs
index 3b4558b..53eee6b 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -20,7 +20,6 @@
*/
use crate::error::Result;
use chrono::{DateTime, TimeZone, Utc};
-use futures::AsyncReadExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
@@ -166,13 +165,7 @@ impl Snapshot {
file_io: &FileIO,
table_metadata: &TableMetadata,
) -> Result<ManifestList> {
- let mut manifest_list_content = Vec::new();
- file_io
- .new_input(&self.manifest_list)?
- .reader()
- .await?
- .read_to_end(&mut manifest_list_content)
- .await?;
+ let manifest_list_content =
file_io.new_input(&self.manifest_list)?.read().await?;
let schema = self.schema(table_metadata)?;
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index f38d771..fd8bd28 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -21,7 +21,6 @@ use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
use crate::Result;
use crate::TableIdent;
-use futures::AsyncReadExt;
use typed_builder::TypedBuilder;
/// Table represents a table in the catalog.
@@ -118,12 +117,8 @@ impl StaticTable {
file_io: FileIO,
) -> Result<Self> {
let metadata_file = file_io.new_input(metadata_file_path)?;
- let mut metadata_file_reader = metadata_file.reader().await?;
- let mut metadata_file_content = String::new();
- metadata_file_reader
- .read_to_string(&mut metadata_file_content)
- .await?;
- let table_metadata =
serde_json::from_str::<TableMetadata>(&metadata_file_content)?;
+ let metadata_file_content = metadata_file.read().await?;
+ let table_metadata =
serde_json::from_slice::<TableMetadata>(&metadata_file_content)?;
Self::from_metadata(table_metadata, table_ident, file_io).await
}
@@ -148,6 +143,7 @@ impl StaticTable {
#[cfg(test)]
mod tests {
use super::*;
+
#[tokio::test]
async fn test_static_table_from_file() {
let metadata_file_name = "TableMetadataV2Valid.json";
@@ -211,13 +207,9 @@ mod tests {
.build()
.unwrap();
let metadata_file = file_io.new_input(metadata_file_path).unwrap();
- let mut metadata_file_reader = metadata_file.reader().await.unwrap();
- let mut metadata_file_content = String::new();
- metadata_file_reader
- .read_to_string(&mut metadata_file_content)
- .await
- .unwrap();
- let table_metadata =
serde_json::from_str::<TableMetadata>(&metadata_file_content).unwrap();
+ let metadata_file_content = metadata_file.read().await.unwrap();
+ let table_metadata =
+
serde_json::from_slice::<TableMetadata>(&metadata_file_content).unwrap();
let static_identifier = TableIdent::from_strs(["ns",
"table"]).unwrap();
let table = Table::builder()
.metadata(table_metadata)
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index b743d84..a67d308 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -17,12 +17,14 @@
//! The module contains the file writer for parquet file format.
+use std::pin::Pin;
+use std::task::{Context, Poll};
use std::{
collections::HashMap,
sync::{atomic::AtomicI64, Arc},
};
-use crate::{io::FileIO, Result};
+use crate::{io::FileIO, io::FileWrite, Result};
use crate::{
io::OutputFile,
spec::{DataFileBuilder, DataFileFormat},
@@ -30,6 +32,8 @@ use crate::{
Error,
};
use arrow_schema::SchemaRef as ArrowSchemaRef;
+use bytes::Bytes;
+use futures::future::BoxFuture;
use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY,
file::properties::WriterProperties};
@@ -103,7 +107,8 @@ impl<T: LocationGenerator, F: FileNameGenerator>
FileWriterBuilder for ParquetWr
.generate_location(&self.file_name_generator.generate_file_name()),
)?;
let inner_writer = TrackWriter::new(out_file.writer().await?,
written_size.clone());
- let writer = AsyncArrowWriter::try_new(inner_writer,
self.schema.clone(), Some(self.props))
+ let async_writer = AsyncFileWriter::new(inner_writer);
+ let writer = AsyncArrowWriter::try_new(async_writer,
self.schema.clone(), Some(self.props))
.map_err(|err| {
Error::new(
crate::ErrorKind::Unexpected,
@@ -125,7 +130,7 @@ impl<T: LocationGenerator, F: FileNameGenerator>
FileWriterBuilder for ParquetWr
/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
pub struct ParquetWriter {
out_file: OutputFile,
- writer: AsyncArrowWriter<TrackWriter>,
+ writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
written_size: Arc<AtomicI64>,
current_row_num: usize,
field_ids: Vec<i32>,
@@ -246,6 +251,105 @@ impl CurrentFileStatus for ParquetWriter {
}
}
+/// AsyncFileWriter is a wrapper of FileWrite to make it compatible with
tokio::io::AsyncWrite.
+///
+/// # NOTES
+///
+/// We keep this wrapper been used inside only.
+///
+/// # TODO
+///
+/// Maybe we can use the buffer from ArrowWriter directly.
+struct AsyncFileWriter<W: FileWrite>(State<W>);
+
+enum State<W: FileWrite> {
+ Idle(Option<W>),
+ Write(BoxFuture<'static, (W, Result<()>)>),
+ Close(BoxFuture<'static, (W, Result<()>)>),
+}
+
+impl<W: FileWrite> AsyncFileWriter<W> {
+ /// Create a new `AsyncFileWriter` with the given writer.
+ pub fn new(writer: W) -> Self {
+ Self(State::Idle(Some(writer)))
+ }
+}
+
+impl<W: FileWrite> tokio::io::AsyncWrite for AsyncFileWriter<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::result::Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+ loop {
+ match &mut this.0 {
+ State::Idle(w) => {
+ let mut writer = w.take().unwrap();
+ let bs = Bytes::copy_from_slice(buf);
+ let fut = async move {
+ let res = writer.write(bs).await;
+ (writer, res)
+ };
+ this.0 = State::Write(Box::pin(fut));
+ }
+ State::Write(fut) => {
+ let (writer, res) = futures::ready!(fut.as_mut().poll(cx));
+ this.0 = State::Idle(Some(writer));
+ return Poll::Ready(res.map(|_| buf.len()).map_err(|err| {
+ std::io::Error::new(std::io::ErrorKind::Other,
Box::new(err))
+ }));
+ }
+ State::Close(_) => {
+ return Poll::Ready(Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "file is closed",
+ )));
+ }
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ loop {
+ match &mut this.0 {
+ State::Idle(w) => {
+ let mut writer = w.take().unwrap();
+ let fut = async move {
+ let res = writer.close().await;
+ (writer, res)
+ };
+ this.0 = State::Close(Box::pin(fut));
+ }
+ State::Write(_) => {
+ return Poll::Ready(Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "file is writing",
+ )));
+ }
+ State::Close(fut) => {
+ let (writer, res) = futures::ready!(fut.as_mut().poll(cx));
+ this.0 = State::Idle(Some(writer));
+ return Poll::Ready(res.map_err(|err| {
+ std::io::Error::new(std::io::ErrorKind::Other,
Box::new(err))
+ }));
+ }
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs
b/crates/iceberg/src/writer/file_writer/track_writer.rs
index 938addd..8d0e490 100644
--- a/crates/iceberg/src/writer/file_writer/track_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/track_writer.rs
@@ -15,14 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::{
- pin::Pin,
- sync::{atomic::AtomicI64, Arc},
-};
-
-use tokio::io::AsyncWrite;
+use bytes::Bytes;
+use std::sync::{atomic::AtomicI64, Arc};
use crate::io::FileWrite;
+use crate::Result;
/// `TrackWriter` is used to track the written size.
pub(crate) struct TrackWriter {
@@ -39,34 +36,18 @@ impl TrackWriter {
}
}
-impl AsyncWrite for TrackWriter {
- fn poll_write(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
- match Pin::new(&mut self.inner).poll_write(cx, buf) {
- std::task::Poll::Ready(Ok(n)) => {
- self.written_size
- .fetch_add(buf.len() as i64,
std::sync::atomic::Ordering::Relaxed);
- std::task::Poll::Ready(Ok(n))
- }
- std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
- std::task::Poll::Pending => std::task::Poll::Pending,
- }
- }
-
- fn poll_flush(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- Pin::new(&mut self.inner).poll_flush(cx)
+#[async_trait::async_trait]
+impl FileWrite for TrackWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let size = bs.len();
+ self.inner.write(bs).await.map(|v| {
+ self.written_size
+ .fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
+ v
+ })
}
- fn poll_shutdown(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- Pin::new(&mut self.inner).poll_shutdown(cx)
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await
}
}
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index 7618d2e..216e94f 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -95,8 +95,6 @@ mod tests {
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use arrow_select::concat::concat_batches;
- use bytes::Bytes;
- use futures::AsyncReadExt;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use crate::{
@@ -124,16 +122,11 @@ mod tests {
) {
assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+ let input_file =
file_io.new_input(data_file.file_path.clone()).unwrap();
// read the written file
- let mut input_file = file_io
- .new_input(data_file.file_path.clone())
- .unwrap()
- .reader()
- .await
- .unwrap();
- let mut res = vec![];
- let file_size = input_file.read_to_end(&mut res).await.unwrap();
- let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+ let input_content = input_file.read().await.unwrap();
+ let reader_builder =
+
ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
let metadata = reader_builder.metadata().clone();
// check data
@@ -154,7 +147,7 @@ mod tests {
.sum::<i64>() as u64
);
- assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+ assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
assert_eq!(data_file.column_sizes.len(), expect_column_num);
data_file.column_sizes.iter().for_each(|(&k, &v)| {
diff --git a/crates/iceberg/tests/file_io_s3_test.rs
b/crates/iceberg/tests/file_io_s3_test.rs
index 7553bcd..36e24f1 100644
--- a/crates/iceberg/tests/file_io_s3_test.rs
+++ b/crates/iceberg/tests/file_io_s3_test.rs
@@ -17,7 +17,6 @@
//! Integration tests for FileIO S3.
-use futures::{AsyncReadExt, AsyncWriteExt};
use iceberg::io::{
FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY,
};
@@ -74,9 +73,7 @@ async fn test_file_io_s3_output() {
.new_output("s3://bucket1/test_output")
.unwrap();
{
- let mut writer = output_file.writer().await.unwrap();
- writer.write_all("123".as_bytes()).await.unwrap();
- writer.close().await.unwrap();
+ output_file.write("123".into()).await.unwrap();
}
assert!(fixture
.file_io
@@ -93,18 +90,16 @@ async fn test_file_io_s3_input() {
.new_output("s3://bucket1/test_input")
.unwrap();
{
- let mut writer = output_file.writer().await.unwrap();
- writer.write_all("test_input".as_bytes()).await.unwrap();
- writer.close().await.unwrap();
+ output_file.write("test_input".into()).await.unwrap();
}
+
let input_file = fixture
.file_io
.new_input("s3://bucket1/test_input")
.unwrap();
+
{
- let mut reader = input_file.reader().await.unwrap();
- let mut buffer = vec![];
- reader.read_to_end(&mut buffer).await.unwrap();
+ let buffer = input_file.read().await.unwrap();
assert_eq!(buffer, "test_input".as_bytes());
}
}
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 9f895ab..4e01723 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -40,4 +40,5 @@ tokio = { workspace = true }
[dev-dependencies]
iceberg-catalog-hms = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }