Ted-Jiang commented on code in PR #2528:
URL: https://github.com/apache/arrow-rs/pull/2528#discussion_r954563200


##########
parquet/src/arrow/array_reader/fixed_len_byte_array.rs:
##########
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
+use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
+use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, 
ValuesBuffer};
+use crate::arrow::record_reader::GenericRecordReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::{Encoding, Type};
+use crate::column::page::PageIterator;
+use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescPtr;
+use crate::util::memory::ByteBufferPtr;
+use arrow::array::{
+    ArrayDataBuilder, ArrayRef, Decimal128Array, FixedSizeBinaryArray,
+    IntervalDayTimeArray, IntervalYearMonthArray,
+};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{DataType as ArrowType, IntervalUnit};
+use std::any::Any;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Returns an [`ArrayReader`] that decodes the provided fixed length byte 
array column
+pub fn make_fixed_len_byte_array_reader(
+    pages: Box<dyn PageIterator>,
+    column_desc: ColumnDescPtr,
+    arrow_type: Option<ArrowType>,
+) -> Result<Box<dyn ArrayReader>> {
+    // Check if Arrow type is specified, else create it from Parquet type
+    let data_type = match arrow_type {
+        Some(t) => t,
+        None => parquet_to_arrow_field(column_desc.as_ref())?
+            .data_type()
+            .clone(),
+    };
+
+    let byte_length = match column_desc.physical_type() {
+        Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
+        t => {
+            return Err(general_err!(
+                "invalid physical type for fixed length byte array reader - 
{}",
+                t
+            ))
+        }
+    };
+
+    match &data_type {
+        ArrowType::FixedSizeBinary(_) => {}
+        ArrowType::Decimal128(_, _) => {
+            if byte_length > 16 {
+                return Err(general_err!(
+                    "decimal 128 type too large, must be less than 16 bytes, 
got {}",
+                    byte_length
+                ));
+            }
+        }
+        ArrowType::Interval(_) => {
+            if byte_length != 12 {
+                // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
+                return Err(general_err!(
+                    "interval type must consist of 12 bytes got {}",
+                    byte_length
+                ));
+            }
+        }
+        _ => {
+            return Err(general_err!(
+                "invalid data type for fixed length byte array reader - {}",
+                data_type
+            ))
+        }
+    }
+
+    Ok(Box::new(FixedLenByteArrayReader::new(
+        pages,
+        column_desc,
+        data_type,
+        byte_length,
+    )))
+}
+
+struct FixedLenByteArrayReader {
+    data_type: ArrowType,
+    byte_length: usize,
+    pages: Box<dyn PageIterator>,
+    def_levels_buffer: Option<Buffer>,
+    rep_levels_buffer: Option<Buffer>,
+    record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
+}
+
+impl FixedLenByteArrayReader {
+    fn new(
+        pages: Box<dyn PageIterator>,
+        column_desc: ColumnDescPtr,
+        data_type: ArrowType,
+        byte_length: usize,
+    ) -> Self {
+        Self {
+            data_type,
+            byte_length,
+            pages,
+            def_levels_buffer: None,
+            rep_levels_buffer: None,
+            record_reader: GenericRecordReader::new_with_records(
+                column_desc,
+                FixedLenByteArrayBuffer {
+                    buffer: Default::default(),
+                    byte_length,
+                },
+            ),
+        }
+    }
+}
+
+impl ArrayReader for FixedLenByteArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        let record_data = self.record_reader.consume_record_data();
+
+        let array_data =
+            ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length 
as i32))
+                .len(self.record_reader.num_values())
+                .add_buffer(record_data)
+                .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
+
+        let binary = FixedSizeBinaryArray::from(unsafe { 
array_data.build_unchecked() });
+
+        // TODO: An improvement might be to do this conversion on read
+        let array = match &self.data_type {
+            ArrowType::Decimal128(p, s) => {
+                let decimal = binary
+                    .iter()
+                    .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
+                    .collect::<Decimal128Array>()
+                    .with_precision_and_scale(*p, *s)?;
+
+                Arc::new(decimal)
+            }
+            ArrowType::Interval(unit) => {
+                // An interval is stored as 3x 32-bit unsigned integers 
storing months, days,
+                // and milliseconds
+                match unit {
+                    IntervalUnit::YearMonth => Arc::new(
+                        binary
+                            .iter()
+                            .map(|o| {
+                                o.map(|b| 
i32::from_le_bytes(b[0..4].try_into().unwrap()))
+                            })
+                            .collect::<IntervalYearMonthArray>(),
+                    ) as ArrayRef,
+                    IntervalUnit::DayTime => Arc::new(
+                        binary
+                            .iter()
+                            .map(|o| {
+                                o.map(|b| {
+                                    
i64::from_le_bytes(b[4..12].try_into().unwrap())
+                                })
+                            })
+                            .collect::<IntervalDayTimeArray>(),
+                    ) as ArrayRef,
+                    IntervalUnit::MonthDayNano => {
+                        return Err(nyi_err!("MonthDayNano intervals not 
supported"));
+                    }
+                }
+            }
+            _ => Arc::new(binary) as ArrayRef,
+        };
+
+        self.def_levels_buffer = self.record_reader.consume_def_levels();
+        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
+        self.record_reader.reset();
+
+        Ok(array)
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+    }
+}
+
+struct FixedLenByteArrayBuffer {
+    buffer: ScalarBuffer<u8>,
+    /// The length of each element in bytes
+    byte_length: usize,
+}
+
+impl ValuesBufferSlice for FixedLenByteArrayBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+}
+
+impl BufferQueue for FixedLenByteArrayBuffer {
+    type Output = Buffer;
+    type Slice = Self;
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        self.buffer.split_off(len * self.byte_length)
+    }
+
+    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+        self
+    }
+
+    fn set_len(&mut self, len: usize) {
+        assert_eq!(self.buffer.len(), len * self.byte_length);
+    }
+}
+
+impl ValuesBuffer for FixedLenByteArrayBuffer {
+    fn pad_nulls(
+        &mut self,
+        read_offset: usize,
+        values_read: usize,
+        levels_read: usize,
+        valid_mask: &[u8],
+    ) {
+        assert_eq!(
+            self.buffer.len(),
+            (read_offset + values_read) * self.byte_length
+        );
+        self.buffer
+            .resize((read_offset + levels_read) * self.byte_length);
+
+        let slice = self.buffer.as_slice_mut();
+
+        let values_range = read_offset..read_offset + values_read;
+        for (value_pos, level_pos) in
+            values_range.rev().zip(iter_set_bits_rev(valid_mask))
+        {
+            debug_assert!(level_pos >= value_pos);
+            if level_pos <= value_pos {
+                break;
+            }
+
+            let level_pos_bytes = level_pos * self.byte_length;
+            let value_pos_bytes = value_pos * self.byte_length;
+
+            for i in 0..self.byte_length {
+                slice[level_pos_bytes + i] = slice[value_pos_bytes + i]
+            }
+        }
+    }
+}
+
+struct ValueDecoder {

Review Comment:
   👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to