scovich commented on code in PR #7535:
URL: https://github.com/apache/arrow-rs/pull/7535#discussion_r2103257749


##########
parquet-variant/src/decoder.rs:
##########
@@ -67,56 +69,30 @@ fn map_try_from_slice_error(e: TryFromSliceError) -> 
ArrowError {
     ArrowError::InvalidArgumentError(e.to_string())
 }
 
-/// Constructs the error message for an invalid UTF-8 string.
-fn invalid_utf8_err() -> ArrowError {
-    ArrowError::InvalidArgumentError("invalid UTF-8 string".to_string())
-}
-
 /// Decodes an Int8 from the value section of a variant.
 pub(crate) fn decode_int8(value: &[u8]) -> Result<i8, ArrowError> {
-    if value.is_empty() {
-        return Err(ArrowError::InvalidArgumentError(
-            "Got empty value buffer so can't decode into int8.".to_string(),
-        ));
-    }
-    let value = i8::from_le_bytes([value[1]]);
+    let value = i8::from_le_bytes(array_from_slice(value, 1)?);
     Ok(value)
 }
 
 /// Decodes a long string from the value section of a variant.
 pub(crate) fn decode_long_string(value: &[u8]) -> Result<&str, ArrowError> {
-    if value.len() < 5 {
-        return Err(ArrowError::InvalidArgumentError(
-            "Tried to decode value buffer into long_string, but it's too short 
(len<5)."
-                .to_string(),
-        ));
-    }
-    let len =
-        
u32::from_le_bytes(value[1..=4].try_into().map_err(map_try_from_slice_error)?) 
as usize;
-    if value.len() < len + 5 {
-        let err_str = format!("The length of the buffer for the long_string is 
soo short, it is {} and it should be at least {} ({} < {} + 5)", value.len(), 
len + 5 , value.len(), len);
-        return Err(ArrowError::InvalidArgumentError(err_str));
-    }
-    let string_bytes = &value[5..5 + len];
-    let string = str::from_utf8(string_bytes).map_err(|_| invalid_utf8_err())?;
+    let len = u32::from_le_bytes(
+        slice_from_slice(value, 1..5)?
+            .try_into()
+            .map_err(map_try_from_slice_error)?,
+    ) as usize;
+    let string =
+        str::from_utf8(slice_from_slice(value, 5..5 + len)?).map_err(|_| 
invalid_utf8_err())?;

Review Comment:
   Basically every caller ends up doing `offset..offset+len`, should the method 
just receive (offset, len) pairs instead? There's technically a risk of causing 
an overflow panic when adding the two values inside... but we add them outside 
without a check right now. If anything, pulling the add operation inside (where 
it can be checked) would be safer?



##########
parquet-variant/src/utils.rs:
##########
@@ -0,0 +1,42 @@
+use std::{array::TryFromSliceError, ops::Range};
+
+use arrow_schema::ArrowError;
+
+#[inline]
+pub(crate) fn slice_from_slice(bytes: &[u8], range: Range<usize>) -> 
Result<&[u8], ArrowError> {
+    bytes.get(range.clone()).ok_or_else(|| {

Review Comment:
   aside: Strange that `Range` lacks an `impl<Idx: Copy> Copy for Range<Idx>`



##########
parquet-variant/src/decoder.rs:
##########
@@ -67,56 +69,30 @@ fn map_try_from_slice_error(e: TryFromSliceError) -> 
ArrowError {
     ArrowError::InvalidArgumentError(e.to_string())
 }
 
-/// Constructs the error message for an invalid UTF-8 string.
-fn invalid_utf8_err() -> ArrowError {
-    ArrowError::InvalidArgumentError("invalid UTF-8 string".to_string())
-}
-
 /// Decodes an Int8 from the value section of a variant.
 pub(crate) fn decode_int8(value: &[u8]) -> Result<i8, ArrowError> {
-    if value.is_empty() {
-        return Err(ArrowError::InvalidArgumentError(
-            "Got empty value buffer so can't decode into int8.".to_string(),
-        ));
-    }
-    let value = i8::from_le_bytes([value[1]]);
+    let value = i8::from_le_bytes(array_from_slice(value, 1)?);
     Ok(value)
 }
 
 /// Decodes a long string from the value section of a variant.
 pub(crate) fn decode_long_string(value: &[u8]) -> Result<&str, ArrowError> {
-    if value.len() < 5 {
-        return Err(ArrowError::InvalidArgumentError(
-            "Tried to decode value buffer into long_string, but it's too short 
(len<5)."
-                .to_string(),
-        ));
-    }
-    let len =
-        
u32::from_le_bytes(value[1..=4].try_into().map_err(map_try_from_slice_error)?) 
as usize;
-    if value.len() < len + 5 {
-        let err_str = format!("The length of the buffer for the long_string is 
soo short, it is {} and it should be at least {} ({} < {} + 5)", value.len(), 
len + 5 , value.len(), len);
-        return Err(ArrowError::InvalidArgumentError(err_str));
-    }
-    let string_bytes = &value[5..5 + len];
-    let string = str::from_utf8(string_bytes).map_err(|_| invalid_utf8_err())?;
+    let len = u32::from_le_bytes(
+        slice_from_slice(value, 1..5)?
+            .try_into()
+            .map_err(map_try_from_slice_error)?,
+    ) as usize;

Review Comment:
   ```suggestion
       let len = u32::from_le_bytes(array_from_slice(value, 1)?) as usize;
   ```



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0

Review Comment:
   It _should_ always be 0... but it's still materialized so we still have to 
check it.



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {

Review Comment:
   qq: do we prefer `dict_size()` here? Because rust is perfectly happy to 
accept that.



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {

Review Comment:
   we're not getting an offset but rather the range of bytes to fetch for a 
given entry?



##########
parquet-variant/src/variant.rs:
##########
@@ -0,0 +1,321 @@
+use std::{borrow::Cow, ops::Index};
+
+use crate::decoder::{self, get_variant_type};
+use arrow_schema::ArrowError;
+use strum_macros::EnumDiscriminants;
+
+#[derive(Clone, Copy, Debug, PartialEq)]
+/// Encodes the Variant Metadata, see the Variant spec file for more 
information

Review Comment:
   Yeah... and the use sites in object and array variant would anyway be the 
more likely performance bottleneck. Unless we want to define a different enum 
variant for each of those as well... 4*4 (object) + 4 (array) = 20 total 
combinations... seems excessive. A middle ground would be to special-case just 
the common ones (e.g. object where both sizes are 1 or both are 4; array where 
the size is 1 or 4). But it's still extra complexity for likely a very small 
gain. We'd be trading off more match arms in one place, in order to eliminate 
match arms somewhere nearby.



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)
     }
 
-    /// Get the header byte, which has the following form
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub fn header(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Can't get header from empty buffer".to_string(),
-            ));
+    /// Get the key-name by index
+    pub fn get_field_by_index(&self, index: usize) -> Result<&'m str, 
ArrowError> {
+        match self.get_offset_by(index) {
+            Ok(range) => self.get_field_by_offset(range),
+            Err(e) => Err(e),
         }
-        Ok(self.bytes[0])
     }
 
-    /// Get the offset_minus_one value from the header
-    pub fn offset_size_minus_one(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            Err(ArrowError::InvalidArgumentError(
-                "Tried to get offset_size_minus_one from header, but 
self.bytes buffer is emtpy."
-                    .to_string(),
-            ))
-        } else {
-            Ok(self.bytes[0] & (0b11 << 6)) // Grab the last 2 bits
-        }
+    /// Gets the field using an offset (Range) - helper method to keep 
consistent API.
+    pub fn get_field_by_offset(&self, offset: Range<usize>) -> Result<&'m str, 
ArrowError> {
+        let dictionary_key_start_byte = 1 // header
+                    + self.header.offset_size as usize // dictionary_size 
field itself
+                    + (self.dict_size + 1) * (self.header.offset_size as 
usize); // all offset entries
+        let dictionary_keys_bytes =
+            slice_from_slice(self.bytes, 
dictionary_key_start_byte..self.bytes.len())?;
+        let dictionary_key_bytes =
+            slice_from_slice(dictionary_keys_bytes, offset.start..offset.end)?;
+        let result = str::from_utf8(dictionary_key_bytes).map_err(|_| 
invalid_utf8_err())?;
+        Ok(result)
     }
 
-    /// Get the offset_size
-    pub fn offset_size(&self) -> Result<u8, ArrowError> {
-        Ok(self.offset_size_minus_one()? + 1)
+    pub fn header(&self) -> VariantMetadataHeader {
+        self.header
     }
 
     /// Get the offsets as an iterator
-    // TODO: Do we want this kind of API?
-    // TODO: Test once API is agreed upon
-    pub fn offsets(&'m self) -> Result<impl Iterator<Item = (usize, usize)> + 
'm, ArrowError> {
+    // TODO: Write tests
+    pub fn offsets(
+        &'m self,
+    ) -> Result<impl Iterator<Item = Result<Range<usize>, ArrowError>> + 'm, 
ArrowError> {
         struct OffsetIterators<'m> {
             buffer: &'m [u8],
+            header: &'m VariantMetadataHeader,
             dict_len: usize,
             seen: usize,
-            offset_size: usize,
         }
         impl<'m> Iterator for OffsetIterators<'m> {
-            type Item = (usize, usize); // (start, end) positions of the bytes
-
-            // TODO: Check bounds here or ensure they're correct
-
+            type Item = Result<Range<usize>, ArrowError>; // Range = (start, 
end) positions of the bytes
             fn next(&mut self) -> Option<Self::Item> {
-                // +1 to skip the first offset
                 if self.seen < self.dict_len {
-                    let start = usize::from_le_bytes(
-                        self.buffer[(self.seen ) * self.offset_size + 1 // +1 
to skip header
-                            ..(self.seen ) * self.offset_size + 1]
-                            .try_into()
-                            .ok()?,
-                    );
+                    let start = self
+                        .header
+                        .offset_size
+                        // skip header via byte_offset=1 and self.seen + 1 
because first is dictionary_size
+                        .unpack_usize(self.buffer, 1, self.seen + 1);
+
+                    let end = self
+                        .header
+                        .offset_size
+                        // skip header via byte_offset=1 and self.seen + 2 to 
get end offset
+                        .unpack_usize(self.buffer, 1, self.seen + 2);
                     self.seen += 1;
-                    let end = usize::from_le_bytes(
-                        self.buffer[(self.seen ) * self.offset_size + 1 // +1 
to skip header
-                            ..(self.seen ) * self.offset_size + 1]
-                            .try_into()
-                            .ok()?,
-                    );
-
-                    Some((start, end))
+                    match (start, end) {
+                        (Ok(start), Ok(end)) => Some(Ok(start..end)),
+                        (Err(e), _) | (_, Err(e)) => Some(Err(e)),
+                    }
                 } else {
                     None
                 }
             }
         }
         let iterator: OffsetIterators = OffsetIterators {
             buffer: self.bytes,
-            dict_len: self.dict_len()?,
+            header: &self.header,
+            dict_len: self.dict_size,
             seen: 0,
-            offset_size: self.offset_size()? as usize,
         };
         Ok(iterator)
     }
-    /// Get the key-name-bytes by index
-    pub fn get_by(&self, index: usize) -> Result<&'m str, ArrowError> {
-        todo!()
-    }
+
     /// Get all key-names as an Iterator of strings
-    // TODO: Result
-    pub fn fields(&self) -> impl Iterator<Item = &'m str> {
-        // Do the same as for offsets
-        todo!();
-        vec![].into_iter()
+    // NOTE: Duplicated code due to issues putting Impl's on structs, this is 
the same as `.offsets` except it
+    // extracts the field using the offset instead of returning the offset.
+    pub fn fields(

Review Comment:
   Isn't this just:
   ```rust
   pub fn fields(&self) -> impl Iterator<Item = Result<&'m str, ArrowError>> + 
'm {
       let bytes = self.buffer;
       self.offsets().map(|range| string_from_slice(bytes, range?))
   }
   ```



##########
parquet-variant/src/utils.rs:
##########
@@ -0,0 +1,42 @@
+use std::{array::TryFromSliceError, ops::Range};
+
+use arrow_schema::ArrowError;
+
+#[inline]
+pub(crate) fn slice_from_slice(bytes: &[u8], range: Range<usize>) -> 
Result<&[u8], ArrowError> {
+    bytes.get(range.clone()).ok_or_else(|| {
+        ArrowError::InvalidArgumentError(format!(
+            "Tried to extract {} bytes at offset {} from {}-byte buffer",
+            range.end - range.start,
+            range.start,
+            bytes.len(),
+        ))

Review Comment:
   The start and end usually come from variant data. If the caller didn't 
validate them, we could have received a start larger than end which would cause 
the `slice::get` call to return None and subsequently cause a panic here due to 
integer underflow. We should probably do `range.end.checked_sub(range.start)` 
here, and return a different error in case of invalid range?
   ```suggestion
       bytes.get(range.clone()).ok_or_else(|| match 
range.end.checked_sub(range.start) {
           Some(len) => ArrowError::InvalidArgumentError(format!(
               "Tried to extract {len} bytes at offset {} from {}-byte buffer",
               range.start,
               bytes.len(),
           )),
           None => ArrowError::InvalidArgumentError(format!(
               "Invalid range: {}..{}", range.start, range.end,
           )),
   ```



##########
parquet-variant/src/utils.rs:
##########
@@ -0,0 +1,42 @@
+use std::{array::TryFromSliceError, ops::Range};
+
+use arrow_schema::ArrowError;
+
+#[inline]
+pub(crate) fn slice_from_slice(bytes: &[u8], range: Range<usize>) -> 
Result<&[u8], ArrowError> {
+    bytes.get(range.clone()).ok_or_else(|| {
+        ArrowError::InvalidArgumentError(format!(
+            "Tried to extract {} bytes at offset {} from {}-byte buffer",
+            range.end - range.start,
+            range.start,
+            bytes.len(),
+        ))
+    })
+}
+pub(crate) fn array_from_slice<const N: usize>(
+    bytes: &[u8],
+    offset: usize,
+) -> Result<[u8; N], ArrowError> {
+    let bytes = slice_from_slice(bytes, offset..offset + N)?;
+    bytes.try_into().map_err(map_try_from_slice_error)
+}
+
+/// To be used in `map_err` when unpacking an integer from a slice of bytes.
+pub(crate) fn map_try_from_slice_error(e: TryFromSliceError) -> ArrowError {
+    ArrowError::InvalidArgumentError(e.to_string())
+}
+
+pub(crate) fn non_empty_slice(slice: &[u8]) -> Result<&[u8], ArrowError> {
+    if slice.is_empty() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Received empty bytes".to_string(),
+        ));
+    } else {
+        return Ok(slice);
+    }

Review Comment:
   All the callers immediately follow with `?[0]` -- should we rename+rework 
this as a thin wrapper around the slice `get` method?
   ```suggestion
   pub(crate) fn first_byte_from_slice(slice: &[u8]) -> Result<u8, ArrowError> {
       slice.get(0).ok_or_else(|| {
           ArrowError::InvalidArgumentError("Received empty bytes".to_string())
       });
   ```
   That way callers would do e.g.:
   ```rust
   let len = (first_byte_from_slice(value)? >> 2) as usize;
   ```



##########
parquet-variant/src/variant.rs:
##########
@@ -1,14 +1,105 @@
-use std::ops::Index;
-
 use crate::decoder::{
     self, get_basic_type, get_primitive_type, VariantBasicType, 
VariantPrimitiveType,
 };
+use crate::utils::{array_from_slice, invalid_utf8_err, non_empty_slice, 
slice_from_slice};
 use arrow_schema::ArrowError;
+use std::{
+    num::TryFromIntError,
+    ops::{Index, Range},
+    str,
+};
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+enum OffsetSizeBytes {
+    One = 1,
+    Two = 2,
+    Three = 3,
+    Four = 4,
+}
+
+impl OffsetSizeBytes {
+    fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
+        use OffsetSizeBytes::*;
+        let result = match offset_size_minus_one {
+            0 => One,
+            1 => Two,
+            2 => Three,
+            3 => Four,
+            _ => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "offset_size_minus_one must be 0–3".to_string(),
+                ))
+            }
+        };
+        Ok(result)
+    }
+
+    fn unpack_usize(
+        &self,
+        bytes: &[u8],
+        byte_offset: usize,  // how many bytes to skip
+        offset_index: usize, // which offset in an array of offsets
+    ) -> Result<usize, ArrowError> {
+        use OffsetSizeBytes::*;
+        let offset = byte_offset + (*self as usize) * offset_index;
+        let result = match self {
+            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            // TODO: Do this one
+            Three => todo!(),
+            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
+                .try_into()
+                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,

Review Comment:
   aside: I would not have guessed this needed type annotations



##########
parquet-variant/src/utils.rs:
##########
@@ -0,0 +1,42 @@
+use std::{array::TryFromSliceError, ops::Range};
+
+use arrow_schema::ArrowError;
+
+#[inline]
+pub(crate) fn slice_from_slice(bytes: &[u8], range: Range<usize>) -> 
Result<&[u8], ArrowError> {
+    bytes.get(range.clone()).ok_or_else(|| {
+        ArrowError::InvalidArgumentError(format!(
+            "Tried to extract {} bytes at offset {} from {}-byte buffer",
+            range.end - range.start,
+            range.start,
+            bytes.len(),
+        ))
+    })
+}
+pub(crate) fn array_from_slice<const N: usize>(
+    bytes: &[u8],
+    offset: usize,
+) -> Result<[u8; N], ArrowError> {
+    let bytes = slice_from_slice(bytes, offset..offset + N)?;
+    bytes.try_into().map_err(map_try_from_slice_error)
+}
+
+/// To be used in `map_err` when unpacking an integer from a slice of bytes.
+pub(crate) fn map_try_from_slice_error(e: TryFromSliceError) -> ArrowError {
+    ArrowError::InvalidArgumentError(e.to_string())
+}
+
+pub(crate) fn non_empty_slice(slice: &[u8]) -> Result<&[u8], ArrowError> {
+    if slice.is_empty() {
+        return Err(ArrowError::InvalidArgumentError(
+            "Received empty bytes".to_string(),
+        ));
+    } else {
+        return Ok(slice);
+    }
+}
+
+/// Constructs the error message for an invalid UTF-8 string.
+pub(crate) fn invalid_utf8_err() -> ArrowError {
+    ArrowError::InvalidArgumentError("invalid UTF-8 string".to_string())

Review Comment:
   All (four?) callers of this method do the same thing... should we pull it in?
   ```suggestion
   pub(crate) fn string_from_slice(slice: &[u8], range: Range<usize>) -> 
Result<&str, ArrowError> {
       str::from_utf8(slice_from_slice(value, range)?)
           .map_err(|_| ArrowError::InvalidArgumentError("invalid UTF-8 
string".to_string())
   ```
   The encapsulation would be especially helpful for the field iterator call 
site:
   
   <details>
   
   ```rust
                       let result = match (start, end) {
                           (Ok(start), Ok(end)) => {
                               // Try to get the slice
                               match slice_from_slice(self.buffer, 1 + start..1 
+ end) {
                                   // Get the field and return it
                                   Ok(bytes) => 
str::from_utf8(bytes).map_err(|_| invalid_utf8_err()),
                                   Err(e) => Err(e),
                               }
                           }
                           (Err(e), _) | (_, Err(e)) => Err(e),
                       };
   ```
   simplifies to
   ```rust
                       let result = match (start, end) {
                           (Ok(start), Ok(end)) => 
string_from_slice(self.buffer, 1 + start..1 + end),
                           (Err(e), _) | (_, Err(e)) => Err(e),
                       };
   ```
   (aside: Nice match arm for handling the not-both-ok case!)
   
   </details>



##########
parquet-variant/src/variant.rs:
##########
@@ -1,14 +1,105 @@
-use std::ops::Index;
-
 use crate::decoder::{
     self, get_basic_type, get_primitive_type, VariantBasicType, 
VariantPrimitiveType,
 };
+use crate::utils::{array_from_slice, invalid_utf8_err, non_empty_slice, 
slice_from_slice};
 use arrow_schema::ArrowError;
+use std::{
+    num::TryFromIntError,
+    ops::{Index, Range},
+    str,
+};
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+enum OffsetSizeBytes {
+    One = 1,
+    Two = 2,
+    Three = 3,
+    Four = 4,
+}
+
+impl OffsetSizeBytes {
+    fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
+        use OffsetSizeBytes::*;
+        let result = match offset_size_minus_one {
+            0 => One,
+            1 => Two,
+            2 => Three,
+            3 => Four,
+            _ => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "offset_size_minus_one must be 0–3".to_string(),
+                ))
+            }
+        };
+        Ok(result)
+    }
+
+    fn unpack_usize(
+        &self,
+        bytes: &[u8],
+        byte_offset: usize,  // how many bytes to skip
+        offset_index: usize, // which offset in an array of offsets
+    ) -> Result<usize, ArrowError> {
+        use OffsetSizeBytes::*;
+        let offset = byte_offset + (*self as usize) * offset_index;
+        let result = match self {
+            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            // TODO: Do this one
+            Three => todo!(),
+            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
+                .try_into()
+                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,
+        };
+        Ok(result)
+    }
+}
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+pub(crate) struct VariantMetadataHeader {
+    version: u8,
+    is_sorted: bool,
+    /// Note: This is `offset_size_minus_one` + 1
+    offset_size: OffsetSizeBytes,
+}
+
+impl<'m> VariantMetadataHeader {

Review Comment:
   Why needed? The struct doesn't keep any references to the initial byte 
slice, so `try_new` should be able to accept a plain `bytes: &[u8]`?



##########
parquet-variant/src/variant.rs:
##########
@@ -1,14 +1,105 @@
-use std::ops::Index;
-
 use crate::decoder::{
     self, get_basic_type, get_primitive_type, VariantBasicType, 
VariantPrimitiveType,
 };
+use crate::utils::{array_from_slice, invalid_utf8_err, non_empty_slice, 
slice_from_slice};
 use arrow_schema::ArrowError;
+use std::{
+    num::TryFromIntError,
+    ops::{Index, Range},
+    str,
+};
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+enum OffsetSizeBytes {
+    One = 1,
+    Two = 2,
+    Three = 3,
+    Four = 4,
+}
+
+impl OffsetSizeBytes {
+    fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
+        use OffsetSizeBytes::*;
+        let result = match offset_size_minus_one {
+            0 => One,
+            1 => Two,
+            2 => Three,
+            3 => Four,
+            _ => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "offset_size_minus_one must be 0–3".to_string(),
+                ))
+            }
+        };
+        Ok(result)
+    }
+
+    fn unpack_usize(
+        &self,
+        bytes: &[u8],
+        byte_offset: usize,  // how many bytes to skip
+        offset_index: usize, // which offset in an array of offsets
+    ) -> Result<usize, ArrowError> {
+        use OffsetSizeBytes::*;
+        let offset = byte_offset + (*self as usize) * offset_index;
+        let result = match self {
+            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            // TODO: Do this one
+            Three => todo!(),
+            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
+                .try_into()
+                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,
+        };
+        Ok(result)
+    }
+}
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+pub(crate) struct VariantMetadataHeader {
+    version: u8,
+    is_sorted: bool,
+    /// Note: This is `offset_size_minus_one` + 1
+    offset_size: OffsetSizeBytes,
+}
+
+impl<'m> VariantMetadataHeader {
+    /// Tries to construct the variant metadata header, which has the form
+    ///              7     6  5   4  3             0
+    ///             +-------+---+---+---------------+
+    /// header      |       |   |   |    version    |
+    ///             +-------+---+---+---------------+
+    ///                 ^         ^
+    ///                 |         +-- sorted_strings
+    ///                 +-- offset_size_minus_one
+    /// The version is a 4-bit value that must always contain the value 1.
+    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
+    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
+    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let Some(header) = bytes.get(0) else {
+            return Err(ArrowError::InvalidArgumentError(
+                "Received zero bytes".to_string(),
+            ));
+        };
+
+        let version = header & 0x0F; // First four bits

Review Comment:
   We need to validate that it "must always contain the value 1".
   
   This is in order to detect the incompatibility if an old variant reader 
encounters bytes produced by a newer variant spec.



##########
parquet-variant/src/variant.rs:
##########
@@ -1,14 +1,105 @@
-use std::ops::Index;
-
 use crate::decoder::{
     self, get_basic_type, get_primitive_type, VariantBasicType, 
VariantPrimitiveType,
 };
+use crate::utils::{array_from_slice, invalid_utf8_err, non_empty_slice, 
slice_from_slice};
 use arrow_schema::ArrowError;
+use std::{
+    num::TryFromIntError,
+    ops::{Index, Range},
+    str,
+};
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+enum OffsetSizeBytes {
+    One = 1,
+    Two = 2,
+    Three = 3,
+    Four = 4,
+}
+
+impl OffsetSizeBytes {
+    fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
+        use OffsetSizeBytes::*;
+        let result = match offset_size_minus_one {
+            0 => One,
+            1 => Two,
+            2 => Three,
+            3 => Four,
+            _ => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "offset_size_minus_one must be 0–3".to_string(),
+                ))
+            }
+        };
+        Ok(result)
+    }
+
+    fn unpack_usize(
+        &self,
+        bytes: &[u8],
+        byte_offset: usize,  // how many bytes to skip
+        offset_index: usize, // which offset in an array of offsets
+    ) -> Result<usize, ArrowError> {
+        use OffsetSizeBytes::*;
+        let offset = byte_offset + (*self as usize) * offset_index;
+        let result = match self {
+            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            // TODO: Do this one
+            Three => todo!(),
+            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
+                .try_into()
+                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,
+        };
+        Ok(result)
+    }
+}
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+pub(crate) struct VariantMetadataHeader {
+    version: u8,
+    is_sorted: bool,
+    /// Note: This is `offset_size_minus_one` + 1
+    offset_size: OffsetSizeBytes,
+}
+
+impl<'m> VariantMetadataHeader {
+    /// Tries to construct the variant metadata header, which has the form
+    ///              7     6  5   4  3             0
+    ///             +-------+---+---+---------------+
+    /// header      |       |   |   |    version    |
+    ///             +-------+---+---+---------------+
+    ///                 ^         ^
+    ///                 |         +-- sorted_strings
+    ///                 +-- offset_size_minus_one
+    /// The version is a 4-bit value that must always contain the value 1.
+    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
+    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
+    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let Some(header) = bytes.get(0) else {

Review Comment:
   Another potential call site for `non_empty_slice` (or 
`first_byte_from_slice`)



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));

Review Comment:
   I'm starting to suspect that `scan` was not the right approach here, sorry 
about that...
   ```suggestion
           let mut prev = None;
           for offset in (0..=dict_size).map(|i| 
header.offset_size.unpack_usize(bytes, 1, i + 1)) {
               let offset = offset?;
               if prev.is_some_and(|prev| prev >= offset) {
                   return Err(...); // non-monotonic
               }
               prev = Some(offset);
   ```



##########
parquet-variant/src/variant.rs:
##########
@@ -1,14 +1,105 @@
-use std::ops::Index;
-
 use crate::decoder::{
     self, get_basic_type, get_primitive_type, VariantBasicType, 
VariantPrimitiveType,
 };
+use crate::utils::{array_from_slice, invalid_utf8_err, non_empty_slice, 
slice_from_slice};
 use arrow_schema::ArrowError;
+use std::{
+    num::TryFromIntError,
+    ops::{Index, Range},
+    str,
+};
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+enum OffsetSizeBytes {
+    One = 1,
+    Two = 2,
+    Three = 3,
+    Four = 4,
+}
+
+impl OffsetSizeBytes {
+    fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
+        use OffsetSizeBytes::*;
+        let result = match offset_size_minus_one {
+            0 => One,
+            1 => Two,
+            2 => Three,
+            3 => Four,
+            _ => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "offset_size_minus_one must be 0–3".to_string(),
+                ))
+            }
+        };
+        Ok(result)
+    }
+
+    fn unpack_usize(
+        &self,
+        bytes: &[u8],
+        byte_offset: usize,  // how many bytes to skip
+        offset_index: usize, // which offset in an array of offsets
+    ) -> Result<usize, ArrowError> {
+        use OffsetSizeBytes::*;
+        let offset = byte_offset + (*self as usize) * offset_index;
+        let result = match self {
+            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            // TODO: Do this one
+            Three => todo!(),
+            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
+                .try_into()
+                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,
+        };
+        Ok(result)
+    }
+}
+
+#[derive(Clone, Debug, Copy, PartialEq)]
+pub(crate) struct VariantMetadataHeader {
+    version: u8,
+    is_sorted: bool,
+    /// Note: This is `offset_size_minus_one` + 1
+    offset_size: OffsetSizeBytes,
+}
+
+impl<'m> VariantMetadataHeader {
+    /// Tries to construct the variant metadata header, which has the form
+    ///              7     6  5   4  3             0
+    ///             +-------+---+---+---------------+
+    /// header      |       |   |   |    version    |
+    ///             +-------+---+---+---------------+
+    ///                 ^         ^
+    ///                 |         +-- sorted_strings
+    ///                 +-- offset_size_minus_one
+    /// The version is a 4-bit value that must always contain the value 1.
+    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
+    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
+    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let Some(header) = bytes.get(0) else {
+            return Err(ArrowError::InvalidArgumentError(
+                "Received zero bytes".to_string(),
+            ));
+        };
+
+        let version = header & 0x0F; // First four bits
+        let is_sorted = (header & 0x10) != 0; // Fifth bit
+        let offset_size_minus_one = (header >> 6) & 0x03; // Last two bits

Review Comment:
   mask not needed, `>>` on u8 shifts in zeros.



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.

Review Comment:
   Why is `index` wrong, sorry? It seems right?



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)

Review Comment:
   seems like we could simplify a bit here?
   ```suggestion
           let unpack = |i| self.header.offset_size.unpack_usize(self.bytes, 1, 
i + 1);
           Ok(unpack(index)?..unpack(index + 1)?)
   ```



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)
     }
 
-    /// Get the header byte, which has the following form
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub fn header(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Can't get header from empty buffer".to_string(),
-            ));
+    /// Get the key-name by index
+    pub fn get_field_by_index(&self, index: usize) -> Result<&'m str, 
ArrowError> {
+        match self.get_offset_by(index) {
+            Ok(range) => self.get_field_by_offset(range),
+            Err(e) => Err(e),

Review Comment:
   Isn't this just:
   ```suggestion
           let range = self.get_offset_by(index)?;
           self.get_field_by_offset(range)
   ```



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)
     }
 
-    /// Get the header byte, which has the following form
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub fn header(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Can't get header from empty buffer".to_string(),
-            ));
+    /// Get the key-name by index
+    pub fn get_field_by_index(&self, index: usize) -> Result<&'m str, 
ArrowError> {
+        match self.get_offset_by(index) {
+            Ok(range) => self.get_field_by_offset(range),
+            Err(e) => Err(e),
         }
-        Ok(self.bytes[0])
     }
 
-    /// Get the offset_minus_one value from the header
-    pub fn offset_size_minus_one(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            Err(ArrowError::InvalidArgumentError(
-                "Tried to get offset_size_minus_one from header, but 
self.bytes buffer is emtpy."
-                    .to_string(),
-            ))
-        } else {
-            Ok(self.bytes[0] & (0b11 << 6)) // Grab the last 2 bits
-        }
+    /// Gets the field using an offset (Range) - helper method to keep 
consistent API.
+    pub fn get_field_by_offset(&self, offset: Range<usize>) -> Result<&'m str, 
ArrowError> {
+        let dictionary_key_start_byte = 1 // header
+                    + self.header.offset_size as usize // dictionary_size 
field itself
+                    + (self.dict_size + 1) * (self.header.offset_size as 
usize); // all offset entries

Review Comment:
   This seems worth paying a u32 to memoize?
   (it always fits in u32, but we could also burn a usize for convenience if we 
wanted to)



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)
     }
 
-    /// Get the header byte, which has the following form
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub fn header(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Can't get header from empty buffer".to_string(),
-            ));
+    /// Get the key-name by index
+    pub fn get_field_by_index(&self, index: usize) -> Result<&'m str, 
ArrowError> {
+        match self.get_offset_by(index) {
+            Ok(range) => self.get_field_by_offset(range),
+            Err(e) => Err(e),
         }
-        Ok(self.bytes[0])
     }
 
-    /// Get the offset_minus_one value from the header
-    pub fn offset_size_minus_one(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            Err(ArrowError::InvalidArgumentError(
-                "Tried to get offset_size_minus_one from header, but 
self.bytes buffer is emtpy."
-                    .to_string(),
-            ))
-        } else {
-            Ok(self.bytes[0] & (0b11 << 6)) // Grab the last 2 bits
-        }
+    /// Gets the field using an offset (Range) - helper method to keep 
consistent API.
+    pub fn get_field_by_offset(&self, offset: Range<usize>) -> Result<&'m str, 
ArrowError> {
+        let dictionary_key_start_byte = 1 // header
+                    + self.header.offset_size as usize // dictionary_size 
field itself
+                    + (self.dict_size + 1) * (self.header.offset_size as 
usize); // all offset entries
+        let dictionary_keys_bytes =
+            slice_from_slice(self.bytes, 
dictionary_key_start_byte..self.bytes.len())?;
+        let dictionary_key_bytes =
+            slice_from_slice(dictionary_keys_bytes, offset.start..offset.end)?;
+        let result = str::from_utf8(dictionary_key_bytes).map_err(|_| 
invalid_utf8_err())?;
+        Ok(result)
     }
 
-    /// Get the offset_size
-    pub fn offset_size(&self) -> Result<u8, ArrowError> {
-        Ok(self.offset_size_minus_one()? + 1)
+    pub fn header(&self) -> VariantMetadataHeader {
+        self.header
     }
 
     /// Get the offsets as an iterator
-    // TODO: Do we want this kind of API?
-    // TODO: Test once API is agreed upon
-    pub fn offsets(&'m self) -> Result<impl Iterator<Item = (usize, usize)> + 
'm, ArrowError> {
+    // TODO: Write tests
+    pub fn offsets(
+        &'m self,

Review Comment:
   It probably doesn't matter, but `self` technically has a different 
(not-longer) lifetime than `'m`. 
   Should we track the two lifetimes? 
   Or at least give the one lifetime a different name to make clear it's not 
actually `'m`?



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)
     }
 
-    /// Get the header byte, which has the following form
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub fn header(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Can't get header from empty buffer".to_string(),
-            ));
+    /// Get the key-name by index
+    pub fn get_field_by_index(&self, index: usize) -> Result<&'m str, 
ArrowError> {
+        match self.get_offset_by(index) {
+            Ok(range) => self.get_field_by_offset(range),
+            Err(e) => Err(e),
         }
-        Ok(self.bytes[0])
     }
 
-    /// Get the offset_minus_one value from the header
-    pub fn offset_size_minus_one(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            Err(ArrowError::InvalidArgumentError(
-                "Tried to get offset_size_minus_one from header, but 
self.bytes buffer is emtpy."
-                    .to_string(),
-            ))
-        } else {
-            Ok(self.bytes[0] & (0b11 << 6)) // Grab the last 2 bits
-        }
+    /// Gets the field using an offset (Range) - helper method to keep 
consistent API.
+    pub fn get_field_by_offset(&self, offset: Range<usize>) -> Result<&'m str, 
ArrowError> {
+        let dictionary_key_start_byte = 1 // header
+                    + self.header.offset_size as usize // dictionary_size 
field itself
+                    + (self.dict_size + 1) * (self.header.offset_size as 
usize); // all offset entries
+        let dictionary_keys_bytes =
+            slice_from_slice(self.bytes, 
dictionary_key_start_byte..self.bytes.len())?;
+        let dictionary_key_bytes =
+            slice_from_slice(dictionary_keys_bytes, offset.start..offset.end)?;
+        let result = str::from_utf8(dictionary_key_bytes).map_err(|_| 
invalid_utf8_err())?;
+        Ok(result)
     }
 
-    /// Get the offset_size
-    pub fn offset_size(&self) -> Result<u8, ArrowError> {
-        Ok(self.offset_size_minus_one()? + 1)
+    pub fn header(&self) -> VariantMetadataHeader {
+        self.header
     }
 
     /// Get the offsets as an iterator
-    // TODO: Do we want this kind of API?
-    // TODO: Test once API is agreed upon
-    pub fn offsets(&'m self) -> Result<impl Iterator<Item = (usize, usize)> + 
'm, ArrowError> {
+    // TODO: Write tests
+    pub fn offsets(
+        &'m self,
+    ) -> Result<impl Iterator<Item = Result<Range<usize>, ArrowError>> + 'm, 
ArrowError> {
         struct OffsetIterators<'m> {
             buffer: &'m [u8],
+            header: &'m VariantMetadataHeader,
             dict_len: usize,
             seen: usize,
-            offset_size: usize,
         }
         impl<'m> Iterator for OffsetIterators<'m> {
-            type Item = (usize, usize); // (start, end) positions of the bytes
-
-            // TODO: Check bounds here or ensure they're correct
-
+            type Item = Result<Range<usize>, ArrowError>; // Range = (start, 
end) positions of the bytes
             fn next(&mut self) -> Option<Self::Item> {
-                // +1 to skip the first offset
                 if self.seen < self.dict_len {
-                    let start = usize::from_le_bytes(
-                        self.buffer[(self.seen ) * self.offset_size + 1 // +1 
to skip header
-                            ..(self.seen ) * self.offset_size + 1]
-                            .try_into()
-                            .ok()?,
-                    );
+                    let start = self
+                        .header
+                        .offset_size
+                        // skip header via byte_offset=1 and self.seen + 1 
because first is dictionary_size
+                        .unpack_usize(self.buffer, 1, self.seen + 1);

Review Comment:
   Here's that same double adjacent unpack again...
   
   But we already use a similar iterator in `MetadaHeader::try_new`, which (see 
comment there) can be simplified as just:
   ```rust
   (0..=dict_size).map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
   ```
   And here we just need it to be peekable:
   ```rust
       pub fn offsets(&self) -> impl Iterator<Item = Result<Range<usize>, 
ArrowError>> + 'm {
           let offset_size = self.header.offset_size; // it's tiny => make it 
`Copy`
           let bytes = self.buffer;
           let mut offsets = (0..=self.dict_size)
               .map(|i| offset_size.unpack_usize(bytes, 1, i + 1))
               .peekable();
           std::iter::from_fn(move || {
               match (match offsets.next()?, match offsets.peek()?) {
                   (Ok(start), Ok(start)) => Some(Ok(start..end)),
                   (Err(e), _) | (_, Err(e)) => Some(Err(e)),
               }
           })
       }
   ```
   BTW, iterator creation itself seems to be infallible, which can simplify the 
return type?
   
   As a bonus, not keeping a reference to self means `'m` lifetime is again 
accurate.
   



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));
+        }
+        Ok(Self {
+            bytes,
+            header,
+            dict_size,
+        })
+    }
+
     /// Whether the dictionary keys are sorted and unique
     pub fn is_sorted(&self) -> bool {
-        todo!()
+        self.header.is_sorted
     }
 
-    /// Get the dict length
-    pub fn dict_len(&self) -> Result<usize, ArrowError> {
-        let end_location = self.offset_size()? as usize + 1;
-        if self.bytes.len() < end_location {
-            let err_str = format!(
-                "Invalid metadata bytes, must have at least length {} but has 
length {}",
-                &end_location,
-                self.bytes.len()
-            );
-            return Err(ArrowError::InvalidArgumentError(err_str));
-        }
-        let dict_len_bytes = &self.bytes[1..end_location];
-        let dict_len = 
usize::from_le_bytes(dict_len_bytes.try_into().map_err(|e| {
-            ArrowError::InvalidArgumentError(format!(
-                "Unable to convert dictionary_size bytes into usize: {}",
-                e,
-            ))
-        })?);
-        Ok(dict_len)
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
     }
-    pub fn version(&self) -> usize {
-        todo!()
+    pub fn version(&self) -> u8 {
+        self.header.version
     }
 
-    /// Get the offset by index
-    pub fn get_offset_by(&self, index: usize) -> Result<usize, ArrowError> {
-        todo!()
+    /// Get the offset by key-index
+    pub fn get_offset_by(&self, index: usize) -> Result<Range<usize>, 
ArrowError> {
+        // TODO: Should we memoize the offsets? There could be thousands of 
them (https://github.com/apache/arrow-rs/pull/7535#discussion_r2101351294)
+        if index >= self.dict_size {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for dictionary of length {}",
+                index, self.dict_size
+            )));
+        }
+
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        // TODO: Validate size before looking up?
+        // TODO: Fix location / bytes here, the index is wrong.
+        let start = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 1)?;
+        let end = self
+            .header
+            .offset_size
+            .unpack_usize(self.bytes, 1, index + 2)?;
+        Ok(start..end)
     }
 
-    /// Get the header byte, which has the following form
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub fn header(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            return Err(ArrowError::InvalidArgumentError(
-                "Can't get header from empty buffer".to_string(),
-            ));
+    /// Get the key-name by index
+    pub fn get_field_by_index(&self, index: usize) -> Result<&'m str, 
ArrowError> {
+        match self.get_offset_by(index) {
+            Ok(range) => self.get_field_by_offset(range),
+            Err(e) => Err(e),
         }
-        Ok(self.bytes[0])
     }
 
-    /// Get the offset_minus_one value from the header
-    pub fn offset_size_minus_one(&self) -> Result<u8, ArrowError> {
-        if self.bytes.is_empty() {
-            Err(ArrowError::InvalidArgumentError(
-                "Tried to get offset_size_minus_one from header, but 
self.bytes buffer is emtpy."
-                    .to_string(),
-            ))
-        } else {
-            Ok(self.bytes[0] & (0b11 << 6)) // Grab the last 2 bits
-        }
+    /// Gets the field using an offset (Range) - helper method to keep 
consistent API.
+    pub fn get_field_by_offset(&self, offset: Range<usize>) -> Result<&'m str, 
ArrowError> {
+        let dictionary_key_start_byte = 1 // header
+                    + self.header.offset_size as usize // dictionary_size 
field itself
+                    + (self.dict_size + 1) * (self.header.offset_size as 
usize); // all offset entries
+        let dictionary_keys_bytes =
+            slice_from_slice(self.bytes, 
dictionary_key_start_byte..self.bytes.len())?;

Review Comment:
   ```suggestion
           let dictionary_keys_bytes = slice_from_slice(self.bytes, 
dictionary_key_start_byte..)?;
   ```



##########
parquet-variant/src/variant.rs:
##########
@@ -18,158 +109,223 @@ impl<'m> VariantMetadata<'m> {
         self.bytes
     }
 
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header = VariantMetadataHeader::try_new(bytes)?;
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // TODO: Refactor, add test for validation
+        let valid = (0..=dict_size)
+            .map(|i| header.offset_size.unpack_usize(bytes, 1, i + 1))
+            .scan(0, |prev, cur| {
+                let Ok(cur_offset) = cur else {
+                    return Some(false);
+                };
+                // Skip the first offset, which is always 0
+                if *prev == 0 {
+                    *prev = cur_offset;
+                    return Some(true);
+                }
+
+                let valid = cur_offset > *prev;
+                *prev = cur_offset;
+                Some(valid)
+            })
+            .all(|valid| valid);
+
+        if !valid {
+            return Err(ArrowError::InvalidArgumentError(
+                "Offsets are not monotonically increasing".to_string(),
+            ));

Review Comment:
   There's also an argument to be made that we don't need to pre-validate 
offsets at all. We don't memoize them while checking, so any later access will 
anyway have to redo the fallible unpack operation. And `slice::get` already 
handles "backward" ranges such as `10..1`, so we don't need to protect against 
that either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to