nevi-me commented on a change in pull request #8402: URL: https://github.com/apache/arrow/pull/8402#discussion_r512229780
########## File path: rust/parquet/src/arrow/array_reader.rs ########## @@ -267,90 +267,79 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> { } } - // convert to arrays - let array = - match (&self.data_type, T::get_physical_type()) { - (ArrowType::Boolean, PhysicalType::BOOLEAN) => { - BoolConverter::new(BooleanArrayConverter {}) - .convert(self.record_reader.cast::<BoolType>()) - } - (ArrowType::Int8, PhysicalType::INT32) => { - Int8Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::Int16, PhysicalType::INT32) => { - Int16Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::Int32, PhysicalType::INT32) => { - Int32Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::UInt8, PhysicalType::INT32) => { - UInt8Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::UInt16, PhysicalType::INT32) => { - UInt16Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::UInt32, PhysicalType::INT32) => { - UInt32Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::Int64, PhysicalType::INT64) => { - Int64Converter::new().convert(self.record_reader.cast::<Int64Type>()) - } - (ArrowType::UInt64, PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::<Int64Type>()) - } - (ArrowType::Float32, PhysicalType::FLOAT) => Float32Converter::new() - .convert(self.record_reader.cast::<FloatType>()), - (ArrowType::Float64, PhysicalType::DOUBLE) => Float64Converter::new() - .convert(self.record_reader.cast::<DoubleType>()), - (ArrowType::Timestamp(unit, _), PhysicalType::INT64) => match unit { - TimeUnit::Millisecond => TimestampMillisecondConverter::new() - .convert(self.record_reader.cast::<Int64Type>()), - TimeUnit::Microsecond => TimestampMicrosecondConverter::new() - .convert(self.record_reader.cast::<Int64Type>()), - _ => Err(general_err!("No conversion from parquet type to arrow type for timestamp with unit {:?}", unit)), - }, - (ArrowType::Date32(unit), PhysicalType::INT32) => match unit { - DateUnit::Day => Date32Converter::new() - .convert(self.record_reader.cast::<Int32Type>()), - _ => Err(general_err!("No conversion from parquet type to arrow type for date with unit {:?}", unit)), - } - (ArrowType::Time32(unit), PhysicalType::INT32) => { - match unit { - TimeUnit::Second => { - Time32SecondConverter::new().convert(self.record_reader.cast::<Int32Type>()) - } - TimeUnit::Millisecond => { - Time32MillisecondConverter::new().convert(self.record_reader.cast::<Int32Type>()) - } - _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) - } - } - (ArrowType::Time64(unit), PhysicalType::INT64) => { - match unit { - TimeUnit::Microsecond => { - Time64MicrosecondConverter::new().convert(self.record_reader.cast::<Int64Type>()) - } - TimeUnit::Nanosecond => { - Time64NanosecondConverter::new().convert(self.record_reader.cast::<Int64Type>()) - } - _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) - } - } - (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => { - UInt32Converter::new().convert(self.record_reader.cast::<Int32Type>()) - } - (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::<Int64Type>()) - } - (ArrowType::Duration(_), PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::<Int64Type>()) - } - (arrow_type, physical_type) => Err(general_err!( - "Reading {:?} type from parquet {:?} is not supported yet.", - arrow_type, - physical_type - )), - }?; + let arrow_data_type = match T::get_physical_type() { + PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE, + PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE, + PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE, + PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE, + PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE, + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); + } + }; + + // Convert to arrays by using the Parquet phyisical type. + // The physical types are then cast to Arrow types if necessary + + let mut record_data = self.record_reader.consume_record_data()?; + + if T::get_physical_type() == PhysicalType::BOOLEAN { + let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); + + for e in record_data.data() { + boolean_buffer.append(*e > 0)?; + } + record_data = boolean_buffer.finish(); + } + + let mut array_data = ArrayDataBuilder::new(arrow_data_type) + .len(self.record_reader.num_values()) + .add_buffer(record_data); + + if let Some(b) = self.record_reader.consume_bitmap_buffer()? { + array_data = array_data.null_bit_buffer(b); + } + + let array = match T::get_physical_type() { + PhysicalType::BOOLEAN => { + Arc::new(PrimitiveArray::<ArrowBooleanType>::from(array_data.build())) + as ArrayRef + } + PhysicalType::INT32 => { + Arc::new(PrimitiveArray::<ArrowInt32Type>::from(array_data.build())) + as ArrayRef + } + PhysicalType::INT64 => { + Arc::new(PrimitiveArray::<ArrowInt64Type>::from(array_data.build())) + as ArrayRef + } + PhysicalType::FLOAT => { + Arc::new(PrimitiveArray::<ArrowFloat32Type>::from(array_data.build())) + as ArrayRef + } + PhysicalType::DOUBLE => { + Arc::new(PrimitiveArray::<ArrowFloat64Type>::from(array_data.build())) + as ArrayRef + } + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); + } + }; + + // cast to Arrow type + // TODO: we need to check if it's fine for this to be fallible. + // My assumption is that we can't get to an illegal cast as we can only + // generate types that are supported, because we'd have gotten them from + // the metadata which was written to the Parquet sink Review comment: It might be involved, and maybe something for a future PR. We don't want a cast to fail here, as we should ideally determine that we can't write out whatever the target datatype is, earlier on. A failure could only be if we're trying to cast a primitive to some unsupported type, which might be impossible assuming that we've covered all casts. @alamb any opinion here? I think you covered the remaining casts that were missing a few weeks ago. Intervals are not an issue as they won't use the primitive types. We can leave the TODO, I'll remove it in a future PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org