This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 13cb7a6c1d refactor(integration/parquet): Use ParquetMetaDataReader
instead (#5170)
13cb7a6c1d is described below
commit 13cb7a6c1d36ae6374248e49f345c44262302a38
Author: Xuanwo <[email protected]>
AuthorDate: Tue Oct 8 11:58:32 2024 +0800
refactor(integration/parquet): Use ParquetMetaDataReader instead (#5170)
Signed-off-by: Xuanwo <[email protected]>
---
integrations/parquet/Cargo.toml | 4 ++--
integrations/parquet/src/async_reader.rs | 40 +++++---------------------------
2 files changed, 8 insertions(+), 36 deletions(-)
diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml
index 153beb0511..61cc78188d 100644
--- a/integrations/parquet/Cargo.toml
+++ b/integrations/parquet/Cargo.toml
@@ -32,13 +32,13 @@ async-trait = "0.1"
bytes = "1"
futures = "0.3"
opendal = { version = "0.50.0", path = "../../core" }
-parquet = { version = "53.0", default-features = false, features = [
+parquet = { version = "53.1", default-features = false, features = [
"async",
"arrow",
] }
[dev-dependencies]
-arrow = { version = "53.0" }
+arrow = { version = "53.1" }
opendal = { version = "0.50.0", path = "../../core", features = [
"services-memory",
"services-s3",
diff --git a/integrations/parquet/src/async_reader.rs
b/integrations/parquet/src/async_reader.rs
index 4bffdd0600..d4cc308be6 100644
--- a/integrations/parquet/src/async_reader.rs
+++ b/integrations/parquet/src/async_reader.rs
@@ -24,8 +24,8 @@ use futures::FutureExt;
use opendal::Reader;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::errors::{ParquetError, Result as ParquetResult};
-use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
+use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::FOOTER_SIZE;
const PREFETCH_FOOTER_SIZE: usize = 512 * 1024;
@@ -156,40 +156,12 @@ impl AsyncFileReader for AsyncReader {
fn get_metadata(&mut self) -> BoxFuture<'_,
ParquetResult<std::sync::Arc<ParquetMetaData>>> {
async move {
- let prefetched_footer_content = self
- .inner
- .read(self.content_length - self.prefetch_footer_size as
u64..self.content_length)
- .await
- .map_err(|err| ParquetError::External(Box::new(err)))?;
- let prefetched_footer_length = prefetched_footer_content.len();
-
- // Decode the metadata length from the last 8 bytes of the file.
- let metadata_length = {
- let buf = &prefetched_footer_content
- .slice((prefetched_footer_length -
FOOTER_SIZE)..prefetched_footer_length);
- // Safety: checked above.
- let buf: [u8; 8] = buf.to_vec().try_into().unwrap();
- decode_footer(&buf)?
- };
-
- // Try to read the metadata from the `prefetched_footer_content`.
- // Otherwise, fetch exact metadata from the remote.
- let buf = if prefetched_footer_length >= metadata_length +
FOOTER_SIZE {
- prefetched_footer_content.slice(
- (prefetched_footer_length - metadata_length - FOOTER_SIZE)
- ..(prefetched_footer_length - FOOTER_SIZE),
- )
- } else {
- self.inner
- .read(
- self.content_length - metadata_length as u64 -
FOOTER_SIZE as u64
- ..self.content_length - FOOTER_SIZE as u64,
- )
- .await
- .map_err(|err| ParquetError::External(Box::new(err)))?
- };
+ let reader =
+
ParquetMetaDataReader::new().with_prefetch_hint(Some(self.prefetch_footer_size));
+ let size = self.content_length as usize;
+ let meta = reader.load_and_finish(self, size).await?;
- Ok(Arc::new(decode_metadata(&buf.to_vec())?))
+ Ok(Arc::new(meta))
}
.boxed()
}