clbarnes commented on code in PR #5222:
URL: https://github.com/apache/arrow-rs/pull/5222#discussion_r1439521309


##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>], 
coalesce: usize) -> Vec<std::
     ret
 }
 
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+    /// A range with a given first and last bytes.
+    Bounded(Range<usize>),
+    /// A range defined only by the first byte requested (requests all 
remaining bytes).
+    Offset(usize),
+    /// A range defined as the number of bytes at the end of the resource.
+    Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        use std::cmp::Ordering::*;
+        use GetRange::*;
+        // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+        // `Range`s and `Offset`s are compared by their first byte,
+        // using the last byte as a tiebreaker (`Offset`s always greater than 
`Range`s).
+        match self {
+            Bounded(r1) => match other {
+                Bounded(r2) => match r1.start.cmp(&r2.start) {
+                    Equal => Some(r1.end.cmp(&r2.end)),
+                    o => Some(o),
+                },
+                Offset(f2) => match r1.start.cmp(f2) {
+                    Equal => Some(Less),
+                    o => Some(o),
+                },
+                Suffix { .. } => None,
+            },
+            Offset(f1) => match other {
+                Bounded(r2) => match f1.cmp(&r2.start) {
+                    Equal => Some(Greater),
+                    o => Some(o),
+                },
+                Offset(f2) => Some(f1.cmp(f2)),
+                Suffix { .. } => None,
+            },
+            Suffix(b1) => match other {
+                Suffix(b2) => Some(b2.cmp(b1)),
+                _ => None,
+            },
+        }
+    }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+    #[snafu(display("Ranges could not be merged because exactly one was a 
suffix"))]
+    DifferentTypes,
+    #[snafu(display("Ranges could not be merged because they were too far 
apart"))]
+    Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+    #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B 
long"))]
+    SuffixTooLarge { expected: usize, actual: usize },
+
+    #[snafu(display("Wanted range starting at {expected}, resource was 
{actual}B long"))]
+    StartTooLarge { expected: usize, actual: usize },
+
+    #[snafu(display("Wanted range ending at {expected}, resource was {actual}B 
long"))]
+    EndTooLarge { expected: usize, actual: usize },
+
+    #[snafu(display("Range started at {start} and ended at {end}"))]
+    Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+    /// Create a range representing the whole resource.
+    pub fn new_whole() -> Self {
+        Self::Offset(0)
+    }
+
+    /// Whether this is an offset.
+    pub fn is_offset(&self) -> bool {
+        match self {
+            GetRange::Offset(_) => true,
+            _ => false,
+        }
+    }
+
+    /// Whether this is a range.
+    pub fn is_range(&self) -> bool {
+        match self {
+            GetRange::Bounded(_) => true,
+            _ => false,
+        }
+    }
+
+    /// Whether this is an offset.
+    pub fn is_suffix(&self) -> bool {
+        match self {
+            GetRange::Suffix(_) => true,
+            _ => false,
+        }
+    }
+
+    /// Whether the range has no bytes in it (i.e. the last byte is before the 
first).
+    ///
+    /// [None] if the range is an `Offset` or `Suffix`.
+    /// The response may still be empty.
+    pub fn is_empty(&self) -> Option<bool> {
+        match self {
+            GetRange::Bounded(r) => Some(r.is_empty()),
+            _ => None,
+        }
+    }
+
+    /// Whether the range represents the entire resource (i.e. it is an 
`Offset` of 0).
+    ///
+    /// [None] if the range is a `Range` or `Suffix`.
+    /// The response may still be the full resource.
+    pub fn is_whole(&self) -> Option<bool> {
+        match self {
+            GetRange::Offset(first) => Some(first == &0),
+            _ => None,
+        }
+    }
+
+    /// How many bytes the range is requesting.
+    ///
+    /// Note that the server may respond with a different number of bytes,
+    /// depending on the length of the resource and other behaviour.
+    pub fn nbytes(&self) -> Option<usize> {
+        match self {
+            GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+            GetRange::Offset(_) => None,
+            GetRange::Suffix(n) => Some(*n),
+        }
+    }
+
+    /// The index of the first byte requested ([None] for suffix).
+    pub fn first_byte(&self) -> Option<usize> {
+        match self {
+            GetRange::Bounded(r) => Some(r.start),
+            GetRange::Offset(o) => Some(*o),
+            GetRange::Suffix(_) => None,
+        }
+    }
+
+    /// The index of the last byte requested ([None] for offset or suffix).
+    pub fn last_byte(&self) -> Option<usize> {
+        match self {
+            GetRange::Bounded(r) => match r.end {
+                0 => None,
+                n => Some(n),
+            },
+            GetRange::Offset { .. } => None,
+            GetRange::Suffix { .. } => None,
+        }
+    }
+
+    pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>, 
InvalidGetRange> {
+        match self {
+            Self::Bounded(r) => {
+                if r.start >= len {
+                    Err(InvalidGetRange::StartTooLarge {
+                        expected: r.start,
+                        actual: len,
+                    })
+                } else if r.end <= r.start {
+                    Err(InvalidGetRange::Inconsistent {
+                        start: r.start,
+                        end: r.end,
+                    })
+                } else if r.end >= len {
+                    Err(InvalidGetRange::EndTooLarge {
+                        expected: r.end,
+                        actual: len,
+                    })
+                } else {
+                    Ok(r.clone())
+                }
+            }
+            Self::Offset(o) => {
+                if o >= &len {
+                    Err(InvalidGetRange::StartTooLarge {
+                        expected: *o,
+                        actual: len,
+                    })
+                } else {
+                    Ok(*o..len)
+                }
+            }
+            Self::Suffix(n) => {
+                len.checked_sub(*n)
+                    .map(|start| start..len)
+                    .ok_or(InvalidGetRange::SuffixTooLarge {
+                        expected: *n,
+                        actual: len,
+                    })
+            }
+        }
+    }
+
+    /// Merge two ranges which fall within a certain distance `coalesce` of 
each other.
+    ///
+    /// Error if exactly one is a suffix or if the ranges are too far apart.
+    pub(crate) fn try_merge(
+        &self,
+        other: &GetRange,
+        coalesce: usize,
+    ) -> Result<Self, RangeMergeError> {
+        use GetRange::*;
+
+        let (g1, g2) = match self.partial_cmp(other) {
+            None => {
+                // One is a suffix, the other isn't.
+                // This is escapable if one represents the whole resource.
+                if let Some(whole) = self.is_whole() {
+                    if whole {
+                        return Ok(GetRange::new_whole());
+                    }
+                }
+                if let Some(whole) = other.is_whole() {
+                    if whole {
+                        return Ok(GetRange::new_whole());
+                    }
+                }
+                return Err(RangeMergeError::DifferentTypes);
+            }
+            Some(o) => match o {
+                std::cmp::Ordering::Greater => (other, self),
+                _ => (self, other),
+            },
+        };
+
+        match g1 {
+            Bounded(r1) => match g2 {
+                Bounded(r2) => {
+                    if r2.start <= r1.end.saturating_add(coalesce) {
+                        Ok(GetRange::Bounded(r1.start..r1.end.max(r2.end)))
+                    } else {
+                        Err(RangeMergeError::Spread)
+                    }
+                }
+                Offset(first) => {
+                    if first < &(r1.end.saturating_add(coalesce)) {
+                        Ok(GetRange::Offset(r1.start))
+                    } else {
+                        Err(RangeMergeError::Spread)
+                    }
+                }
+                Suffix { .. } => unreachable!(),
+            },
+            // Either an offset or suffix, both of which would contain the 
second range.
+            _ => Ok(g1.clone()),
+        }
+    }
+
+    pub fn matches_range(&self, range: Range<usize>, len: usize) -> bool {
+        match self {
+            Self::Bounded(r) => r == &range,
+            Self::Offset(o) => o == &range.start && len == range.end,
+            Self::Suffix(n) => range.end == len && range.start == len - n,
+        }
+    }
+}
+
+/// Returns a sorted [Vec] of [HttpRange::Offset] and [HttpRange::Range] that 
cover `ranges`,
+/// and a single [HttpRange::Suffix] if one or more are given.
+/// The suffix is also omitted if any of the ranges is the whole resource 
(`0-`).
+///
+/// The suffix is returned separately as it may still overlap with the other 
ranges,
+/// so the caller may want to handle it differently.
+pub fn merge_get_ranges<T: Into<GetRange> + Clone>(
+    ranges: &[T],
+    coalesce: usize,
+) -> (Vec<GetRange>, Option<GetRange>) {
+    if ranges.is_empty() {
+        return (vec![], None);
+    }
+    let mut v = Vec::with_capacity(ranges.len());
+    let mut o = None::<usize>;
+
+    for rng in ranges.iter().cloned().map(|r| r.into()) {
+        match rng {
+            GetRange::Suffix(n) => {
+                if let Some(suff) = o {
+                    o = Some(suff.max(n));
+                } else {
+                    o = Some(n);
+                }
+            }
+            _ => v.push(rng),
+        }
+    }
+
+    let mut ret = Vec::with_capacity(v.len() + 1);
+    let suff = o.map(|s| GetRange::Suffix(s));
+
+    if v.is_empty() {
+        if let Some(s) = suff.as_ref() {
+            ret.push(s.clone());
+        }
+        return (ret, suff);
+    }
+
+    // unwrap is fine because we've already filtered out the suffixes
+    v.sort_by(|a, b| a.partial_cmp(b).unwrap());
+    let mut it = v.into_iter();
+    let mut prev = it.next().unwrap();
+
+    for curr in it {
+        match prev {
+            GetRange::Offset { .. } => {
+                match prev.try_merge(&curr, coalesce) {
+                    Ok(r) => ret.push(r),
+                    Err(_) => {
+                        ret.push(prev);
+                        ret.push(curr);
+                    }
+                }
+
+                let Some(s) = suff else {
+                    return (ret, None);
+                };
+
+                if ret.last().unwrap().is_whole().unwrap() {
+                    return (ret, None);
+                } else {
+                    return (ret, Some(s));
+                }
+            }
+            GetRange::Bounded { .. } => match prev.try_merge(&curr, coalesce) {
+                Ok(r) => ret.push(r),
+                Err(_) => {
+                    ret.push(prev);
+                    ret.push(curr.clone());
+                }
+            },
+            GetRange::Suffix { .. } => unreachable!(),
+        }
+        prev = curr;
+    }
+
+    (ret, suff)
+}
+
+impl Display for GetRange {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            GetRange::Bounded(r) => f.write_fmt(format_args!("{}-{}", r.start, 
r.end - 1)),
+            GetRange::Offset(o) => f.write_fmt(format_args!("{o}-")),
+            GetRange::Suffix(n) => f.write_fmt(format_args!("-{n}")),
+        }
+    }
+}
+
+impl<T: RangeBounds<usize>> From<T> for GetRange {
+    fn from(value: T) -> Self {
+        use std::ops::Bound::*;
+        let first = match value.start_bound() {
+            Included(i) => *i,
+            Excluded(i) => i + 1,
+            Unbounded => 0,
+        };
+        match value.end_bound() {
+            Included(i) => GetRange::Bounded(first..(i + 1)),
+            Excluded(i) => GetRange::Bounded(first..*i),
+            Unbounded => GetRange::Offset(first),
+        }
+    }
+}
+
+/// Takes a function `fetch` that can fetch a range of bytes and uses this to
+/// fetch the provided byte `ranges`
+///
+/// To improve performance it will:
+///
+/// * Combine ranges less than `coalesce` bytes apart into a single call to 
`fetch`
+/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
+pub async fn coalesce_get_ranges<F, E, Fut>(
+    ranges: &[GetRange],
+    fetch: F,
+    coalesce: usize,
+) -> Result<Vec<Bytes>, E>
+where
+    F: Send + FnMut(GetRange) -> Fut,
+    E: Send,
+    Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
+{
+    let (mut fetch_ranges, suff_opt) = merge_get_ranges(ranges, coalesce);
+    if let Some(suff) = suff_opt.as_ref() {
+        fetch_ranges.push(suff.clone());
+    }
+    if fetch_ranges.is_empty() {
+        return Ok(vec![]);
+    }
+
+    let mut fetched: Vec<_> = 
futures::stream::iter(fetch_ranges.iter().cloned())
+        .map(fetch)
+        .buffered(OBJECT_STORE_COALESCE_PARALLEL)
+        .try_collect()
+        .await?;
+
+    let suff = suff_opt.map(|r| {
+        let nbytes = match r {
+            GetRange::Suffix(n) => n,
+            _ => unreachable!(),
+        };
+        fetch_ranges.pop().unwrap();
+        let b = fetched.pop().unwrap();
+        if nbytes >= b.len() {
+            b
+        } else {
+            b.slice((b.len() - nbytes)..)
+        }
+    });
+
+    Ok(ranges
+        .iter()
+        .map(|range| {
+            match range {
+                GetRange::Suffix(n) => {
+                    let b = suff.as_ref().unwrap();
+                    let start = b.len().saturating_sub(*n);
+                    return b.slice(start..b.len());
+                }
+                _ => (),
+            }
+            // unwrapping range.first_byte() is fine because we've 
early-returned suffixes
+            let idx = fetch_ranges
+                .partition_point(|v| v.first_byte().unwrap() <= 
range.first_byte().unwrap())
+                - 1;
+            let fetch_range = &fetch_ranges[idx];
+            let fetch_bytes = &fetched[idx];
+
+            let start = range.first_byte().unwrap() - 
fetch_range.first_byte().unwrap();
+            let end = range.last_byte().map_or(fetch_bytes.len(), |range_last| 
{
+                fetch_bytes
+                    .len()
+                    .max(range_last - fetch_range.first_byte().unwrap() + 1)
+            });
+            fetch_bytes.slice(start..end)
+        })
+        .collect())
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidRangeResponse {
+    #[snafu(display("Response was not PARTIAL_CONTENT; length {length:?}"))]
+    NotPartial { length: Option<usize> },
+    #[snafu(display("Content-Range header not present"))]
+    NoContentRange,
+    #[snafu(display("Content-Range header could not be parsed: {value:?}"))]
+    InvalidContentRange { value: Vec<u8> },
+}
+
+pub(crate) fn response_range(r: &Response) -> Result<Range<usize>, 
InvalidRangeResponse> {
+    use InvalidRangeResponse::*;
+
+    if r.status() != StatusCode::PARTIAL_CONTENT {
+        return Err(NotPartial {
+            length: r.content_length().map(|s| s as usize),
+        });
+    }
+
+    let val_bytes = r
+        .headers()
+        .get(CONTENT_RANGE)
+        .ok_or(NoContentRange)?
+        .as_bytes();
+
+    match ContentRange::parse_bytes(val_bytes) {

Review Comment:
   A well-formed content range header is very simple. However, strictly headers 
can contain basically anything they want and there's no guarantee it is 
well-formed, so I chose to push the complexity down into the external lib. We 
can have the logic internally and just raise a very broad error otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to