This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 76af889 build: Restore CI by making parquet and arrow version
consistent (#280)
76af889 is described below
commit 76af8898028aa69ed4abc749c1af4b74776b910c
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Mar 18 22:22:27 2024 -0700
build: Restore CI by making parquet and arrow version consistent (#280)
---
Cargo.toml | 16 +++++------
crates/iceberg/src/transform/temporal.rs | 18 ++++++------
.../src/writer/file_writer/parquet_writer.rs | 33 ++++++----------------
3 files changed, 24 insertions(+), 43 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 809fc4f..7da16e0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,10 +18,10 @@
[workspace]
resolver = "2"
members = [
- "crates/catalog/*",
- "crates/examples",
- "crates/iceberg",
- "crates/test_utils",
+ "crates/catalog/*",
+ "crates/examples",
+ "crates/iceberg",
+ "crates/test_utils",
]
[workspace.package]
@@ -37,9 +37,9 @@ rust-version = "1.75.0"
anyhow = "1.0.72"
apache-avro = "0.16"
array-init = "2"
-arrow-arith = { version = ">=46" }
-arrow-array = { version = ">=46" }
-arrow-schema = { version = ">=46" }
+arrow-arith = { version = "51" }
+arrow-array = { version = "51" }
+arrow-schema = { version = "51" }
async-stream = "0.3.5"
async-trait = "0.1"
bimap = "0.6"
@@ -61,7 +61,7 @@ murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
-parquet = "50"
+parquet = "51"
pilota = "0.10.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
diff --git a/crates/iceberg/src/transform/temporal.rs
b/crates/iceberg/src/transform/temporal.rs
index 7b8deb1..4556543 100644
--- a/crates/iceberg/src/transform/temporal.rs
+++ b/crates/iceberg/src/transform/temporal.rs
@@ -17,10 +17,8 @@
use super::TransformFunction;
use crate::{Error, ErrorKind, Result};
-use arrow_arith::{
- arity::binary,
- temporal::{month_dyn, year_dyn},
-};
+use arrow_arith::temporal::DatePart;
+use arrow_arith::{arity::binary, temporal::date_part};
use arrow_array::{
types::Date32Type, Array, ArrayRef, Date32Array, Int32Array,
TimestampMicrosecondArray,
};
@@ -43,8 +41,8 @@ pub struct Year;
impl TransformFunction for Year {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
- let array =
- year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
+ let array = date_part(&input, DatePart::Year)
+ .map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
Ok(Arc::<Int32Array>::new(
array
.as_any()
@@ -61,15 +59,15 @@ pub struct Month;
impl TransformFunction for Month {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
- let year_array =
- year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
+ let year_array = date_part(&input, DatePart::Year)
+ .map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
let year_array: Int32Array = year_array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|v| 12 * (v - UNIX_EPOCH_YEAR));
- let month_array =
- month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
+ let month_array = date_part(&input, DatePart::Month)
+ .map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
Ok(Arc::<Int32Array>::new(
binary(
month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index bb4550f..3ec1a1b 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -18,7 +18,6 @@
//! The module contains the file writer for parquet file format.
use std::{
- cmp::max,
collections::HashMap,
sync::{atomic::AtomicI64, Arc},
};
@@ -43,9 +42,6 @@ use super::{
/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
#[derive(Clone)]
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
- /// `buffer_size` determines the initial size of the intermediate buffer.
- /// The intermediate buffer will automatically be resized if necessary
- init_buffer_size: usize,
props: WriterProperties,
schema: ArrowSchemaRef,
@@ -55,13 +51,9 @@ pub struct ParquetWriterBuilder<T: LocationGenerator, F:
FileNameGenerator> {
}
impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
- /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB
if the given buffer size is smaller than it.
- const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024;
-
/// Create a new `ParquetWriterBuilder`
/// To construct the write result, the schema should contain the
`PARQUET_FIELD_ID_META_KEY` metadata for each field.
pub fn new(
- init_buffer_size: usize,
props: WriterProperties,
schema: ArrowSchemaRef,
file_io: FileIO,
@@ -69,7 +61,6 @@ impl<T: LocationGenerator, F: FileNameGenerator>
ParquetWriterBuilder<T, F> {
file_name_generator: F,
) -> Self {
Self {
- init_buffer_size,
props,
schema,
file_io,
@@ -112,20 +103,14 @@ impl<T: LocationGenerator, F: FileNameGenerator>
FileWriterBuilder for ParquetWr
.generate_location(&self.file_name_generator.generate_file_name()),
)?;
let inner_writer = TrackWriter::new(out_file.writer().await?,
written_size.clone());
- let init_buffer_size = max(Self::MIN_BUFFER_SIZE,
self.init_buffer_size);
- let writer = AsyncArrowWriter::try_new(
- inner_writer,
- self.schema.clone(),
- init_buffer_size,
- Some(self.props),
- )
- .map_err(|err| {
- Error::new(
- crate::ErrorKind::Unexpected,
- "Failed to build parquet writer.",
- )
- .with_source(err)
- })?;
+ let writer = AsyncArrowWriter::try_new(inner_writer,
self.schema.clone(), Some(self.props))
+ .map_err(|err| {
+ Error::new(
+ crate::ErrorKind::Unexpected,
+ "Failed to build parquet writer.",
+ )
+ .with_source(err)
+ })?;
Ok(ParquetWriter {
writer,
@@ -311,7 +296,6 @@ mod tests {
// write data
let mut pw = ParquetWriterBuilder::new(
- 0,
WriterProperties::builder().build(),
to_write.schema(),
file_io.clone(),
@@ -551,7 +535,6 @@ mod tests {
// write data
let mut pw = ParquetWriterBuilder::new(
- 0,
WriterProperties::builder().build(),
to_write.schema(),
file_io.clone(),