tustvold commented on code in PR #5222:
URL: https://github.com/apache/arrow-rs/pull/5222#discussion_r1431295926
##########
object_store/src/local.rs:
##########
@@ -416,9 +419,16 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;
+ let range = if let Some(r) = options.range {
+ r.as_range(meta.size)
+ .map_err(|e| as_generic_err(STORE, e))?
Review Comment:
The perhaps more "idiomatic" way to do this would be to define a new Error
variant for this module, e.g.
```
UnexpectedRange { source: UnexpectedRange }
```
And then this would be `.context(UnexpectedRangeSnafu)`
##########
object_store/Cargo.toml:
##########
@@ -54,6 +54,7 @@ reqwest = { version = "0.11", default-features = false,
features = ["rustls-tls-
ring = { version = "0.17", default-features = false, features = ["std"],
optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time",
"io-util"] }
+http-content-range = "0.1.2"
Review Comment:
We typically try very hard to avoid additional dependencies, especially ones
that don't have very obvious long-term maintenance stories (don't have some
sort of community backing them). Is this strictly necessary?
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
Review Comment:
See comments below, but I would be inclined to not include this for now
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ use std::cmp::Ordering::*;
+ use GetRange::*;
+ // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+ // `Range`s and `Offset`s are compared by their first byte,
+ // using the last byte as a tiebreaker (`Offset`s always greater than
`Range`s).
+ match self {
+ Bounded(r1) => match other {
+ Bounded(r2) => match r1.start.cmp(&r2.start) {
+ Equal => Some(r1.end.cmp(&r2.end)),
+ o => Some(o),
+ },
+ Offset(f2) => match r1.start.cmp(f2) {
+ Equal => Some(Less),
+ o => Some(o),
+ },
+ Suffix { .. } => None,
+ },
+ Offset(f1) => match other {
+ Bounded(r2) => match f1.cmp(&r2.start) {
+ Equal => Some(Greater),
+ o => Some(o),
+ },
+ Offset(f2) => Some(f1.cmp(f2)),
+ Suffix { .. } => None,
+ },
+ Suffix(b1) => match other {
+ Suffix(b2) => Some(b2.cmp(b1)),
+ _ => None,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+ #[snafu(display("Ranges could not be merged because exactly one was a
suffix"))]
+ DifferentTypes,
+ #[snafu(display("Ranges could not be merged because they were too far
apart"))]
+ Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+ #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B
long"))]
+ SuffixTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range starting at {expected}, resource was
{actual}B long"))]
+ StartTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range ending at {expected}, resource was {actual}B
long"))]
+ EndTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Range started at {start} and ended at {end}"))]
+ Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+ /// Create a range representing the whole resource.
+ pub fn new_whole() -> Self {
+ Self::Offset(0)
+ }
+
+ /// Whether this is an offset.
+ pub fn is_offset(&self) -> bool {
+ match self {
+ GetRange::Offset(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is a range.
+ pub fn is_range(&self) -> bool {
+ match self {
+ GetRange::Bounded(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is an offset.
+ pub fn is_suffix(&self) -> bool {
+ match self {
+ GetRange::Suffix(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether the range has no bytes in it (i.e. the last byte is before the
first).
+ ///
+ /// [None] if the range is an `Offset` or `Suffix`.
+ /// The response may still be empty.
+ pub fn is_empty(&self) -> Option<bool> {
+ match self {
+ GetRange::Bounded(r) => Some(r.is_empty()),
+ _ => None,
+ }
+ }
+
+ /// Whether the range represents the entire resource (i.e. it is an
`Offset` of 0).
+ ///
+ /// [None] if the range is a `Range` or `Suffix`.
+ /// The response may still be the full resource.
+ pub fn is_whole(&self) -> Option<bool> {
+ match self {
+ GetRange::Offset(first) => Some(first == &0),
+ _ => None,
+ }
+ }
+
+ /// How many bytes the range is requesting.
+ ///
+ /// Note that the server may respond with a different number of bytes,
+ /// depending on the length of the resource and other behaviour.
+ pub fn nbytes(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+ GetRange::Offset(_) => None,
+ GetRange::Suffix(n) => Some(*n),
+ }
+ }
+
+ /// The index of the first byte requested ([None] for suffix).
+ pub fn first_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.start),
+ GetRange::Offset(o) => Some(*o),
+ GetRange::Suffix(_) => None,
+ }
+ }
+
+ /// The index of the last byte requested ([None] for offset or suffix).
+ pub fn last_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => match r.end {
+ 0 => None,
+ n => Some(n),
+ },
+ GetRange::Offset { .. } => None,
+ GetRange::Suffix { .. } => None,
+ }
+ }
+
+ pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>,
InvalidGetRange> {
+ match self {
+ Self::Bounded(r) => {
+ if r.start >= len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: r.start,
+ actual: len,
+ })
+ } else if r.end <= r.start {
+ Err(InvalidGetRange::Inconsistent {
+ start: r.start,
+ end: r.end,
+ })
+ } else if r.end >= len {
+ Err(InvalidGetRange::EndTooLarge {
+ expected: r.end,
+ actual: len,
+ })
+ } else {
+ Ok(r.clone())
+ }
+ }
+ Self::Offset(o) => {
+ if o >= &len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: *o,
+ actual: len,
+ })
+ } else {
+ Ok(*o..len)
+ }
+ }
+ Self::Suffix(n) => {
+ len.checked_sub(*n)
+ .map(|start| start..len)
+ .ok_or(InvalidGetRange::SuffixTooLarge {
+ expected: *n,
+ actual: len,
+ })
+ }
+ }
+ }
+
+ /// Merge two ranges which fall within a certain distance `coalesce` of
each other.
+ ///
+ /// Error if exactly one is a suffix or if the ranges are too far apart.
+ pub(crate) fn try_merge(
+ &self,
+ other: &GetRange,
+ coalesce: usize,
+ ) -> Result<Self, RangeMergeError> {
+ use GetRange::*;
+
+ let (g1, g2) = match self.partial_cmp(other) {
+ None => {
+ // One is a suffix, the other isn't.
+ // This is escapable if one represents the whole resource.
+ if let Some(whole) = self.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ if let Some(whole) = other.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ return Err(RangeMergeError::DifferentTypes);
+ }
+ Some(o) => match o {
+ std::cmp::Ordering::Greater => (other, self),
+ _ => (self, other),
+ },
+ };
+
+ match g1 {
+ Bounded(r1) => match g2 {
+ Bounded(r2) => {
+ if r2.start <= r1.end.saturating_add(coalesce) {
+ Ok(GetRange::Bounded(r1.start..r1.end.max(r2.end)))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Offset(first) => {
+ if first < &(r1.end.saturating_add(coalesce)) {
+ Ok(GetRange::Offset(r1.start))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Suffix { .. } => unreachable!(),
+ },
+ // Either an offset or suffix, both of which would contain the
second range.
+ _ => Ok(g1.clone()),
+ }
+ }
+
+ pub fn matches_range(&self, range: Range<usize>, len: usize) -> bool {
+ match self {
+ Self::Bounded(r) => r == &range,
+ Self::Offset(o) => o == &range.start && len == range.end,
+ Self::Suffix(n) => range.end == len && range.start == len - n,
+ }
+ }
+}
+
+/// Returns a sorted [Vec] of [HttpRange::Offset] and [HttpRange::Range] that
cover `ranges`,
+/// and a single [HttpRange::Suffix] if one or more are given.
+/// The suffix is also omitted if any of the ranges is the whole resource
(`0-`).
+///
+/// The suffix is returned separately as it may still overlap with the other
ranges,
+/// so the caller may want to handle it differently.
+pub fn merge_get_ranges<T: Into<GetRange> + Clone>(
+ ranges: &[T],
+ coalesce: usize,
+) -> (Vec<GetRange>, Option<GetRange>) {
+ if ranges.is_empty() {
+ return (vec![], None);
+ }
+ let mut v = Vec::with_capacity(ranges.len());
+ let mut o = None::<usize>;
+
+ for rng in ranges.iter().cloned().map(|r| r.into()) {
+ match rng {
+ GetRange::Suffix(n) => {
+ if let Some(suff) = o {
+ o = Some(suff.max(n));
+ } else {
+ o = Some(n);
+ }
+ }
+ _ => v.push(rng),
+ }
+ }
+
+ let mut ret = Vec::with_capacity(v.len() + 1);
+ let suff = o.map(|s| GetRange::Suffix(s));
+
+ if v.is_empty() {
+ if let Some(s) = suff.as_ref() {
+ ret.push(s.clone());
+ }
+ return (ret, suff);
+ }
+
+ // unwrap is fine because we've already filtered out the suffixes
+ v.sort_by(|a, b| a.partial_cmp(b).unwrap());
+ let mut it = v.into_iter();
+ let mut prev = it.next().unwrap();
+
+ for curr in it {
+ match prev {
+ GetRange::Offset { .. } => {
+ match prev.try_merge(&curr, coalesce) {
+ Ok(r) => ret.push(r),
+ Err(_) => {
+ ret.push(prev);
+ ret.push(curr);
+ }
+ }
+
+ let Some(s) = suff else {
+ return (ret, None);
+ };
+
+ if ret.last().unwrap().is_whole().unwrap() {
+ return (ret, None);
+ } else {
+ return (ret, Some(s));
+ }
+ }
+ GetRange::Bounded { .. } => match prev.try_merge(&curr, coalesce) {
+ Ok(r) => ret.push(r),
+ Err(_) => {
+ ret.push(prev);
+ ret.push(curr.clone());
+ }
+ },
+ GetRange::Suffix { .. } => unreachable!(),
+ }
+ prev = curr;
+ }
+
+ (ret, suff)
+}
+
+impl Display for GetRange {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ GetRange::Bounded(r) => f.write_fmt(format_args!("{}-{}", r.start,
r.end - 1)),
+ GetRange::Offset(o) => f.write_fmt(format_args!("{o}-")),
+ GetRange::Suffix(n) => f.write_fmt(format_args!("-{n}")),
+ }
+ }
+}
+
+impl<T: RangeBounds<usize>> From<T> for GetRange {
+ fn from(value: T) -> Self {
+ use std::ops::Bound::*;
+ let first = match value.start_bound() {
+ Included(i) => *i,
+ Excluded(i) => i + 1,
+ Unbounded => 0,
+ };
+ match value.end_bound() {
+ Included(i) => GetRange::Bounded(first..(i + 1)),
+ Excluded(i) => GetRange::Bounded(first..*i),
+ Unbounded => GetRange::Offset(first),
+ }
+ }
+}
+
+/// Takes a function `fetch` that can fetch a range of bytes and uses this to
+/// fetch the provided byte `ranges`
+///
+/// To improve performance it will:
+///
+/// * Combine ranges less than `coalesce` bytes apart into a single call to
`fetch`
+/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
+pub async fn coalesce_get_ranges<F, E, Fut>(
Review Comment:
Do we strictly need this, it feels exceedingly niche to be selecting ranges
from a file without knowing how large the file is?
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ use std::cmp::Ordering::*;
+ use GetRange::*;
+ // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+ // `Range`s and `Offset`s are compared by their first byte,
+ // using the last byte as a tiebreaker (`Offset`s always greater than
`Range`s).
+ match self {
+ Bounded(r1) => match other {
+ Bounded(r2) => match r1.start.cmp(&r2.start) {
+ Equal => Some(r1.end.cmp(&r2.end)),
+ o => Some(o),
+ },
+ Offset(f2) => match r1.start.cmp(f2) {
+ Equal => Some(Less),
+ o => Some(o),
+ },
+ Suffix { .. } => None,
+ },
+ Offset(f1) => match other {
+ Bounded(r2) => match f1.cmp(&r2.start) {
+ Equal => Some(Greater),
+ o => Some(o),
+ },
+ Offset(f2) => Some(f1.cmp(f2)),
+ Suffix { .. } => None,
+ },
+ Suffix(b1) => match other {
+ Suffix(b2) => Some(b2.cmp(b1)),
+ _ => None,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+ #[snafu(display("Ranges could not be merged because exactly one was a
suffix"))]
+ DifferentTypes,
+ #[snafu(display("Ranges could not be merged because they were too far
apart"))]
+ Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+ #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B
long"))]
+ SuffixTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range starting at {expected}, resource was
{actual}B long"))]
+ StartTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range ending at {expected}, resource was {actual}B
long"))]
+ EndTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Range started at {start} and ended at {end}"))]
+ Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+ /// Create a range representing the whole resource.
+ pub fn new_whole() -> Self {
+ Self::Offset(0)
+ }
+
+ /// Whether this is an offset.
+ pub fn is_offset(&self) -> bool {
+ match self {
+ GetRange::Offset(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is a range.
+ pub fn is_range(&self) -> bool {
+ match self {
+ GetRange::Bounded(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is an offset.
+ pub fn is_suffix(&self) -> bool {
+ match self {
+ GetRange::Suffix(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether the range has no bytes in it (i.e. the last byte is before the
first).
+ ///
+ /// [None] if the range is an `Offset` or `Suffix`.
+ /// The response may still be empty.
+ pub fn is_empty(&self) -> Option<bool> {
+ match self {
+ GetRange::Bounded(r) => Some(r.is_empty()),
+ _ => None,
+ }
+ }
+
+ /// Whether the range represents the entire resource (i.e. it is an
`Offset` of 0).
+ ///
+ /// [None] if the range is a `Range` or `Suffix`.
+ /// The response may still be the full resource.
+ pub fn is_whole(&self) -> Option<bool> {
+ match self {
+ GetRange::Offset(first) => Some(first == &0),
+ _ => None,
+ }
+ }
+
+ /// How many bytes the range is requesting.
+ ///
+ /// Note that the server may respond with a different number of bytes,
+ /// depending on the length of the resource and other behaviour.
+ pub fn nbytes(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+ GetRange::Offset(_) => None,
+ GetRange::Suffix(n) => Some(*n),
+ }
+ }
+
+ /// The index of the first byte requested ([None] for suffix).
+ pub fn first_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.start),
+ GetRange::Offset(o) => Some(*o),
+ GetRange::Suffix(_) => None,
+ }
+ }
+
+ /// The index of the last byte requested ([None] for offset or suffix).
+ pub fn last_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => match r.end {
+ 0 => None,
+ n => Some(n),
+ },
+ GetRange::Offset { .. } => None,
+ GetRange::Suffix { .. } => None,
+ }
+ }
Review Comment:
Perhaps we could keep things simple and not include all these methods to
start with?
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ use std::cmp::Ordering::*;
+ use GetRange::*;
+ // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+ // `Range`s and `Offset`s are compared by their first byte,
+ // using the last byte as a tiebreaker (`Offset`s always greater than
`Range`s).
+ match self {
+ Bounded(r1) => match other {
+ Bounded(r2) => match r1.start.cmp(&r2.start) {
+ Equal => Some(r1.end.cmp(&r2.end)),
+ o => Some(o),
+ },
+ Offset(f2) => match r1.start.cmp(f2) {
+ Equal => Some(Less),
+ o => Some(o),
+ },
+ Suffix { .. } => None,
+ },
+ Offset(f1) => match other {
+ Bounded(r2) => match f1.cmp(&r2.start) {
+ Equal => Some(Greater),
+ o => Some(o),
+ },
+ Offset(f2) => Some(f1.cmp(f2)),
+ Suffix { .. } => None,
+ },
+ Suffix(b1) => match other {
+ Suffix(b2) => Some(b2.cmp(b1)),
+ _ => None,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+ #[snafu(display("Ranges could not be merged because exactly one was a
suffix"))]
+ DifferentTypes,
+ #[snafu(display("Ranges could not be merged because they were too far
apart"))]
+ Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+ #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B
long"))]
+ SuffixTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range starting at {expected}, resource was
{actual}B long"))]
+ StartTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range ending at {expected}, resource was {actual}B
long"))]
+ EndTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Range started at {start} and ended at {end}"))]
+ Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+ /// Create a range representing the whole resource.
+ pub fn new_whole() -> Self {
+ Self::Offset(0)
+ }
+
+ /// Whether this is an offset.
+ pub fn is_offset(&self) -> bool {
+ match self {
+ GetRange::Offset(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is a range.
+ pub fn is_range(&self) -> bool {
+ match self {
+ GetRange::Bounded(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is an offset.
+ pub fn is_suffix(&self) -> bool {
+ match self {
+ GetRange::Suffix(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether the range has no bytes in it (i.e. the last byte is before the
first).
+ ///
+ /// [None] if the range is an `Offset` or `Suffix`.
+ /// The response may still be empty.
+ pub fn is_empty(&self) -> Option<bool> {
+ match self {
+ GetRange::Bounded(r) => Some(r.is_empty()),
+ _ => None,
+ }
+ }
+
+ /// Whether the range represents the entire resource (i.e. it is an
`Offset` of 0).
+ ///
+ /// [None] if the range is a `Range` or `Suffix`.
+ /// The response may still be the full resource.
+ pub fn is_whole(&self) -> Option<bool> {
+ match self {
+ GetRange::Offset(first) => Some(first == &0),
+ _ => None,
+ }
+ }
+
+ /// How many bytes the range is requesting.
+ ///
+ /// Note that the server may respond with a different number of bytes,
+ /// depending on the length of the resource and other behaviour.
+ pub fn nbytes(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+ GetRange::Offset(_) => None,
+ GetRange::Suffix(n) => Some(*n),
+ }
+ }
+
+ /// The index of the first byte requested ([None] for suffix).
+ pub fn first_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.start),
+ GetRange::Offset(o) => Some(*o),
+ GetRange::Suffix(_) => None,
+ }
+ }
+
+ /// The index of the last byte requested ([None] for offset or suffix).
+ pub fn last_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => match r.end {
+ 0 => None,
+ n => Some(n),
+ },
+ GetRange::Offset { .. } => None,
+ GetRange::Suffix { .. } => None,
+ }
+ }
+
+ pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>,
InvalidGetRange> {
+ match self {
+ Self::Bounded(r) => {
+ if r.start >= len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: r.start,
+ actual: len,
+ })
+ } else if r.end <= r.start {
+ Err(InvalidGetRange::Inconsistent {
+ start: r.start,
+ end: r.end,
+ })
+ } else if r.end >= len {
+ Err(InvalidGetRange::EndTooLarge {
+ expected: r.end,
+ actual: len,
+ })
+ } else {
+ Ok(r.clone())
+ }
+ }
+ Self::Offset(o) => {
+ if o >= &len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: *o,
+ actual: len,
+ })
+ } else {
+ Ok(*o..len)
+ }
+ }
+ Self::Suffix(n) => {
+ len.checked_sub(*n)
+ .map(|start| start..len)
+ .ok_or(InvalidGetRange::SuffixTooLarge {
+ expected: *n,
+ actual: len,
+ })
+ }
+ }
+ }
+
+ /// Merge two ranges which fall within a certain distance `coalesce` of
each other.
+ ///
+ /// Error if exactly one is a suffix or if the ranges are too far apart.
+ pub(crate) fn try_merge(
+ &self,
+ other: &GetRange,
+ coalesce: usize,
+ ) -> Result<Self, RangeMergeError> {
+ use GetRange::*;
+
+ let (g1, g2) = match self.partial_cmp(other) {
+ None => {
+ // One is a suffix, the other isn't.
+ // This is escapable if one represents the whole resource.
+ if let Some(whole) = self.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ if let Some(whole) = other.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ return Err(RangeMergeError::DifferentTypes);
+ }
+ Some(o) => match o {
+ std::cmp::Ordering::Greater => (other, self),
+ _ => (self, other),
+ },
+ };
+
+ match g1 {
+ Bounded(r1) => match g2 {
+ Bounded(r2) => {
+ if r2.start <= r1.end.saturating_add(coalesce) {
+ Ok(GetRange::Bounded(r1.start..r1.end.max(r2.end)))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Offset(first) => {
+ if first < &(r1.end.saturating_add(coalesce)) {
+ Ok(GetRange::Offset(r1.start))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Suffix { .. } => unreachable!(),
+ },
+ // Either an offset or suffix, both of which would contain the
second range.
+ _ => Ok(g1.clone()),
+ }
+ }
+
+ pub fn matches_range(&self, range: Range<usize>, len: usize) -> bool {
+ match self {
+ Self::Bounded(r) => r == &range,
+ Self::Offset(o) => o == &range.start && len == range.end,
+ Self::Suffix(n) => range.end == len && range.start == len - n,
+ }
+ }
+}
+
+/// Returns a sorted [Vec] of [HttpRange::Offset] and [HttpRange::Range] that
cover `ranges`,
+/// and a single [HttpRange::Suffix] if one or more are given.
+/// The suffix is also omitted if any of the ranges is the whole resource
(`0-`).
+///
+/// The suffix is returned separately as it may still overlap with the other
ranges,
+/// so the caller may want to handle it differently.
+pub fn merge_get_ranges<T: Into<GetRange> + Clone>(
Review Comment:
Perhaps we could omit these for now, it seems pretty niche to me to be
selecting multiple ranges from a file without knowing its size, and the
behaviour is somewhat subtle.
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ use std::cmp::Ordering::*;
+ use GetRange::*;
+ // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+ // `Range`s and `Offset`s are compared by their first byte,
+ // using the last byte as a tiebreaker (`Offset`s always greater than
`Range`s).
+ match self {
+ Bounded(r1) => match other {
+ Bounded(r2) => match r1.start.cmp(&r2.start) {
+ Equal => Some(r1.end.cmp(&r2.end)),
+ o => Some(o),
+ },
+ Offset(f2) => match r1.start.cmp(f2) {
+ Equal => Some(Less),
+ o => Some(o),
+ },
+ Suffix { .. } => None,
+ },
+ Offset(f1) => match other {
+ Bounded(r2) => match f1.cmp(&r2.start) {
+ Equal => Some(Greater),
+ o => Some(o),
+ },
+ Offset(f2) => Some(f1.cmp(f2)),
+ Suffix { .. } => None,
+ },
+ Suffix(b1) => match other {
+ Suffix(b2) => Some(b2.cmp(b1)),
+ _ => None,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+ #[snafu(display("Ranges could not be merged because exactly one was a
suffix"))]
+ DifferentTypes,
+ #[snafu(display("Ranges could not be merged because they were too far
apart"))]
+ Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+ #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B
long"))]
+ SuffixTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range starting at {expected}, resource was
{actual}B long"))]
+ StartTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range ending at {expected}, resource was {actual}B
long"))]
+ EndTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Range started at {start} and ended at {end}"))]
+ Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+ /// Create a range representing the whole resource.
+ pub fn new_whole() -> Self {
+ Self::Offset(0)
+ }
+
+ /// Whether this is an offset.
+ pub fn is_offset(&self) -> bool {
+ match self {
+ GetRange::Offset(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is a range.
+ pub fn is_range(&self) -> bool {
+ match self {
+ GetRange::Bounded(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is an offset.
+ pub fn is_suffix(&self) -> bool {
+ match self {
+ GetRange::Suffix(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether the range has no bytes in it (i.e. the last byte is before the
first).
+ ///
+ /// [None] if the range is an `Offset` or `Suffix`.
+ /// The response may still be empty.
+ pub fn is_empty(&self) -> Option<bool> {
+ match self {
+ GetRange::Bounded(r) => Some(r.is_empty()),
+ _ => None,
+ }
+ }
+
+ /// Whether the range represents the entire resource (i.e. it is an
`Offset` of 0).
+ ///
+ /// [None] if the range is a `Range` or `Suffix`.
+ /// The response may still be the full resource.
+ pub fn is_whole(&self) -> Option<bool> {
+ match self {
+ GetRange::Offset(first) => Some(first == &0),
+ _ => None,
+ }
+ }
+
+ /// How many bytes the range is requesting.
+ ///
+ /// Note that the server may respond with a different number of bytes,
+ /// depending on the length of the resource and other behaviour.
+ pub fn nbytes(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+ GetRange::Offset(_) => None,
+ GetRange::Suffix(n) => Some(*n),
+ }
+ }
+
+ /// The index of the first byte requested ([None] for suffix).
+ pub fn first_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.start),
+ GetRange::Offset(o) => Some(*o),
+ GetRange::Suffix(_) => None,
+ }
+ }
+
+ /// The index of the last byte requested ([None] for offset or suffix).
+ pub fn last_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => match r.end {
+ 0 => None,
+ n => Some(n),
+ },
+ GetRange::Offset { .. } => None,
+ GetRange::Suffix { .. } => None,
+ }
+ }
+
+ pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>,
InvalidGetRange> {
+ match self {
+ Self::Bounded(r) => {
+ if r.start >= len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: r.start,
+ actual: len,
+ })
+ } else if r.end <= r.start {
+ Err(InvalidGetRange::Inconsistent {
+ start: r.start,
+ end: r.end,
+ })
+ } else if r.end >= len {
+ Err(InvalidGetRange::EndTooLarge {
+ expected: r.end,
+ actual: len,
+ })
+ } else {
+ Ok(r.clone())
+ }
+ }
+ Self::Offset(o) => {
+ if o >= &len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: *o,
+ actual: len,
+ })
+ } else {
+ Ok(*o..len)
+ }
+ }
+ Self::Suffix(n) => {
+ len.checked_sub(*n)
+ .map(|start| start..len)
+ .ok_or(InvalidGetRange::SuffixTooLarge {
+ expected: *n,
+ actual: len,
+ })
+ }
+ }
+ }
+
+ /// Merge two ranges which fall within a certain distance `coalesce` of
each other.
+ ///
+ /// Error if exactly one is a suffix or if the ranges are too far apart.
+ pub(crate) fn try_merge(
Review Comment:
See below, I would prefer to not provide this as the behaviour is subtle and
the use-cases relatively limited.
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ use std::cmp::Ordering::*;
+ use GetRange::*;
+ // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+ // `Range`s and `Offset`s are compared by their first byte,
+ // using the last byte as a tiebreaker (`Offset`s always greater than
`Range`s).
+ match self {
+ Bounded(r1) => match other {
+ Bounded(r2) => match r1.start.cmp(&r2.start) {
+ Equal => Some(r1.end.cmp(&r2.end)),
+ o => Some(o),
+ },
+ Offset(f2) => match r1.start.cmp(f2) {
+ Equal => Some(Less),
+ o => Some(o),
+ },
+ Suffix { .. } => None,
+ },
+ Offset(f1) => match other {
+ Bounded(r2) => match f1.cmp(&r2.start) {
+ Equal => Some(Greater),
+ o => Some(o),
+ },
+ Offset(f2) => Some(f1.cmp(f2)),
+ Suffix { .. } => None,
+ },
+ Suffix(b1) => match other {
+ Suffix(b2) => Some(b2.cmp(b1)),
+ _ => None,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+ #[snafu(display("Ranges could not be merged because exactly one was a
suffix"))]
+ DifferentTypes,
+ #[snafu(display("Ranges could not be merged because they were too far
apart"))]
+ Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+ #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B
long"))]
+ SuffixTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range starting at {expected}, resource was
{actual}B long"))]
+ StartTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range ending at {expected}, resource was {actual}B
long"))]
+ EndTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Range started at {start} and ended at {end}"))]
+ Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+ /// Create a range representing the whole resource.
+ pub fn new_whole() -> Self {
+ Self::Offset(0)
+ }
+
+ /// Whether this is an offset.
+ pub fn is_offset(&self) -> bool {
+ match self {
+ GetRange::Offset(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is a range.
+ pub fn is_range(&self) -> bool {
+ match self {
+ GetRange::Bounded(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is an offset.
+ pub fn is_suffix(&self) -> bool {
+ match self {
+ GetRange::Suffix(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether the range has no bytes in it (i.e. the last byte is before the
first).
+ ///
+ /// [None] if the range is an `Offset` or `Suffix`.
+ /// The response may still be empty.
+ pub fn is_empty(&self) -> Option<bool> {
+ match self {
+ GetRange::Bounded(r) => Some(r.is_empty()),
+ _ => None,
+ }
+ }
+
+ /// Whether the range represents the entire resource (i.e. it is an
`Offset` of 0).
+ ///
+ /// [None] if the range is a `Range` or `Suffix`.
+ /// The response may still be the full resource.
+ pub fn is_whole(&self) -> Option<bool> {
+ match self {
+ GetRange::Offset(first) => Some(first == &0),
+ _ => None,
+ }
+ }
+
+ /// How many bytes the range is requesting.
+ ///
+ /// Note that the server may respond with a different number of bytes,
+ /// depending on the length of the resource and other behaviour.
+ pub fn nbytes(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+ GetRange::Offset(_) => None,
+ GetRange::Suffix(n) => Some(*n),
+ }
+ }
+
+ /// The index of the first byte requested ([None] for suffix).
+ pub fn first_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.start),
+ GetRange::Offset(o) => Some(*o),
+ GetRange::Suffix(_) => None,
+ }
+ }
+
+ /// The index of the last byte requested ([None] for offset or suffix).
+ pub fn last_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => match r.end {
+ 0 => None,
+ n => Some(n),
+ },
+ GetRange::Offset { .. } => None,
+ GetRange::Suffix { .. } => None,
+ }
+ }
+
+ pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>,
InvalidGetRange> {
+ match self {
+ Self::Bounded(r) => {
+ if r.start >= len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: r.start,
+ actual: len,
+ })
+ } else if r.end <= r.start {
+ Err(InvalidGetRange::Inconsistent {
+ start: r.start,
+ end: r.end,
+ })
+ } else if r.end >= len {
+ Err(InvalidGetRange::EndTooLarge {
+ expected: r.end,
+ actual: len,
+ })
+ } else {
+ Ok(r.clone())
+ }
+ }
+ Self::Offset(o) => {
+ if o >= &len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: *o,
+ actual: len,
+ })
+ } else {
+ Ok(*o..len)
+ }
+ }
+ Self::Suffix(n) => {
+ len.checked_sub(*n)
+ .map(|start| start..len)
+ .ok_or(InvalidGetRange::SuffixTooLarge {
+ expected: *n,
+ actual: len,
+ })
+ }
+ }
+ }
+
+ /// Merge two ranges which fall within a certain distance `coalesce` of
each other.
+ ///
+ /// Error if exactly one is a suffix or if the ranges are too far apart.
+ pub(crate) fn try_merge(
+ &self,
+ other: &GetRange,
+ coalesce: usize,
+ ) -> Result<Self, RangeMergeError> {
+ use GetRange::*;
+
+ let (g1, g2) = match self.partial_cmp(other) {
+ None => {
+ // One is a suffix, the other isn't.
+ // This is escapable if one represents the whole resource.
+ if let Some(whole) = self.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ if let Some(whole) = other.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ return Err(RangeMergeError::DifferentTypes);
+ }
+ Some(o) => match o {
+ std::cmp::Ordering::Greater => (other, self),
+ _ => (self, other),
+ },
+ };
+
+ match g1 {
+ Bounded(r1) => match g2 {
+ Bounded(r2) => {
+ if r2.start <= r1.end.saturating_add(coalesce) {
+ Ok(GetRange::Bounded(r1.start..r1.end.max(r2.end)))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Offset(first) => {
+ if first < &(r1.end.saturating_add(coalesce)) {
+ Ok(GetRange::Offset(r1.start))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Suffix { .. } => unreachable!(),
+ },
+ // Either an offset or suffix, both of which would contain the
second range.
+ _ => Ok(g1.clone()),
+ }
+ }
+
+ pub fn matches_range(&self, range: Range<usize>, len: usize) -> bool {
+ match self {
+ Self::Bounded(r) => r == &range,
+ Self::Offset(o) => o == &range.start && len == range.end,
+ Self::Suffix(n) => range.end == len && range.start == len - n,
+ }
+ }
+}
+
+/// Returns a sorted [Vec] of [HttpRange::Offset] and [HttpRange::Range] that
cover `ranges`,
+/// and a single [HttpRange::Suffix] if one or more are given.
+/// The suffix is also omitted if any of the ranges is the whole resource
(`0-`).
+///
+/// The suffix is returned separately as it may still overlap with the other
ranges,
+/// so the caller may want to handle it differently.
+pub fn merge_get_ranges<T: Into<GetRange> + Clone>(
+ ranges: &[T],
+ coalesce: usize,
+) -> (Vec<GetRange>, Option<GetRange>) {
+ if ranges.is_empty() {
+ return (vec![], None);
+ }
+ let mut v = Vec::with_capacity(ranges.len());
+ let mut o = None::<usize>;
+
+ for rng in ranges.iter().cloned().map(|r| r.into()) {
+ match rng {
+ GetRange::Suffix(n) => {
+ if let Some(suff) = o {
+ o = Some(suff.max(n));
+ } else {
+ o = Some(n);
+ }
+ }
+ _ => v.push(rng),
+ }
+ }
+
+ let mut ret = Vec::with_capacity(v.len() + 1);
+ let suff = o.map(|s| GetRange::Suffix(s));
+
+ if v.is_empty() {
+ if let Some(s) = suff.as_ref() {
+ ret.push(s.clone());
+ }
+ return (ret, suff);
+ }
+
+ // unwrap is fine because we've already filtered out the suffixes
+ v.sort_by(|a, b| a.partial_cmp(b).unwrap());
+ let mut it = v.into_iter();
+ let mut prev = it.next().unwrap();
+
+ for curr in it {
+ match prev {
+ GetRange::Offset { .. } => {
+ match prev.try_merge(&curr, coalesce) {
+ Ok(r) => ret.push(r),
+ Err(_) => {
+ ret.push(prev);
+ ret.push(curr);
+ }
+ }
+
+ let Some(s) = suff else {
+ return (ret, None);
+ };
+
+ if ret.last().unwrap().is_whole().unwrap() {
+ return (ret, None);
+ } else {
+ return (ret, Some(s));
+ }
+ }
+ GetRange::Bounded { .. } => match prev.try_merge(&curr, coalesce) {
+ Ok(r) => ret.push(r),
+ Err(_) => {
+ ret.push(prev);
+ ret.push(curr.clone());
+ }
+ },
+ GetRange::Suffix { .. } => unreachable!(),
+ }
+ prev = curr;
+ }
+
+ (ret, suff)
+}
+
+impl Display for GetRange {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ GetRange::Bounded(r) => f.write_fmt(format_args!("{}-{}", r.start,
r.end - 1)),
+ GetRange::Offset(o) => f.write_fmt(format_args!("{o}-")),
+ GetRange::Suffix(n) => f.write_fmt(format_args!("-{n}")),
+ }
+ }
+}
+
+impl<T: RangeBounds<usize>> From<T> for GetRange {
+ fn from(value: T) -> Self {
+ use std::ops::Bound::*;
+ let first = match value.start_bound() {
+ Included(i) => *i,
+ Excluded(i) => i + 1,
+ Unbounded => 0,
+ };
+ match value.end_bound() {
+ Included(i) => GetRange::Bounded(first..(i + 1)),
+ Excluded(i) => GetRange::Bounded(first..*i),
+ Unbounded => GetRange::Offset(first),
+ }
+ }
+}
+
+/// Takes a function `fetch` that can fetch a range of bytes and uses this to
+/// fetch the provided byte `ranges`
+///
+/// To improve performance it will:
+///
+/// * Combine ranges less than `coalesce` bytes apart into a single call to
`fetch`
+/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
+pub async fn coalesce_get_ranges<F, E, Fut>(
+ ranges: &[GetRange],
+ fetch: F,
+ coalesce: usize,
+) -> Result<Vec<Bytes>, E>
+where
+ F: Send + FnMut(GetRange) -> Fut,
+ E: Send,
+ Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
+{
+ let (mut fetch_ranges, suff_opt) = merge_get_ranges(ranges, coalesce);
+ if let Some(suff) = suff_opt.as_ref() {
+ fetch_ranges.push(suff.clone());
+ }
+ if fetch_ranges.is_empty() {
+ return Ok(vec![]);
+ }
+
+ let mut fetched: Vec<_> =
futures::stream::iter(fetch_ranges.iter().cloned())
+ .map(fetch)
+ .buffered(OBJECT_STORE_COALESCE_PARALLEL)
+ .try_collect()
+ .await?;
+
+ let suff = suff_opt.map(|r| {
+ let nbytes = match r {
+ GetRange::Suffix(n) => n,
+ _ => unreachable!(),
+ };
+ fetch_ranges.pop().unwrap();
+ let b = fetched.pop().unwrap();
+ if nbytes >= b.len() {
+ b
+ } else {
+ b.slice((b.len() - nbytes)..)
+ }
+ });
+
+ Ok(ranges
+ .iter()
+ .map(|range| {
+ match range {
+ GetRange::Suffix(n) => {
+ let b = suff.as_ref().unwrap();
+ let start = b.len().saturating_sub(*n);
+ return b.slice(start..b.len());
+ }
+ _ => (),
+ }
+ // unwrapping range.first_byte() is fine because we've
early-returned suffixes
+ let idx = fetch_ranges
+ .partition_point(|v| v.first_byte().unwrap() <=
range.first_byte().unwrap())
+ - 1;
+ let fetch_range = &fetch_ranges[idx];
+ let fetch_bytes = &fetched[idx];
+
+ let start = range.first_byte().unwrap() -
fetch_range.first_byte().unwrap();
+ let end = range.last_byte().map_or(fetch_bytes.len(), |range_last|
{
+ fetch_bytes
+ .len()
+ .max(range_last - fetch_range.first_byte().unwrap() + 1)
+ });
+ fetch_bytes.slice(start..end)
+ })
+ .collect())
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidRangeResponse {
+ #[snafu(display("Response was not PARTIAL_CONTENT; length {length:?}"))]
+ NotPartial { length: Option<usize> },
+ #[snafu(display("Content-Range header not present"))]
+ NoContentRange,
+ #[snafu(display("Content-Range header could not be parsed: {value:?}"))]
+ InvalidContentRange { value: Vec<u8> },
+}
+
+pub(crate) fn response_range(r: &Response) -> Result<Range<usize>,
InvalidRangeResponse> {
+ use InvalidRangeResponse::*;
+
+ if r.status() != StatusCode::PARTIAL_CONTENT {
+ return Err(NotPartial {
+ length: r.content_length().map(|s| s as usize),
+ });
+ }
+
+ let val_bytes = r
+ .headers()
+ .get(CONTENT_RANGE)
+ .ok_or(NoContentRange)?
+ .as_bytes();
+
+ match ContentRange::parse_bytes(val_bytes) {
+ ContentRange::Bytes(c) => Ok(c.first_byte as usize..(c.last_byte as
usize + 1)),
+ ContentRange::UnboundBytes(c) => Ok(c.first_byte as
usize..(c.last_byte as usize + 1)),
+ _ => Err(InvalidContentRange {
+ value: val_bytes.to_vec(),
+ }),
+ }
+}
+
+pub(crate) fn as_generic_err<E: std::error::Error + Send + Sync + 'static>(
Review Comment:
As mentioned above, the more idiomatic way to handle this is mapping via the
module local error enumeration
##########
object_store/src/util.rs:
##########
@@ -167,6 +178,501 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>],
coalesce: usize) -> Vec<std::
ret
}
+/// A single range in a `Range` request.
+///
+/// These can be created from [usize] ranges, like
+///
+/// ```rust
+/// # use byteranges::request::HttpRange;
+/// let range1: HttpRange = (50..150).into();
+/// let range2: HttpRange = (50..=150).into();
+/// let range3: HttpRange = (50..).into();
+/// let range4: HttpRange = (..150).into();
+/// ```
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum GetRange {
+ /// A range with a given first and last bytes.
+ Bounded(Range<usize>),
+ /// A range defined only by the first byte requested (requests all
remaining bytes).
+ Offset(usize),
+ /// A range defined as the number of bytes at the end of the resource.
+ Suffix(usize),
+}
+
+impl PartialOrd for GetRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ use std::cmp::Ordering::*;
+ use GetRange::*;
+ // `Suffix`es cannot be compared with `Range`s or `Offset`s.
+ // `Range`s and `Offset`s are compared by their first byte,
+ // using the last byte as a tiebreaker (`Offset`s always greater than
`Range`s).
+ match self {
+ Bounded(r1) => match other {
+ Bounded(r2) => match r1.start.cmp(&r2.start) {
+ Equal => Some(r1.end.cmp(&r2.end)),
+ o => Some(o),
+ },
+ Offset(f2) => match r1.start.cmp(f2) {
+ Equal => Some(Less),
+ o => Some(o),
+ },
+ Suffix { .. } => None,
+ },
+ Offset(f1) => match other {
+ Bounded(r2) => match f1.cmp(&r2.start) {
+ Equal => Some(Greater),
+ o => Some(o),
+ },
+ Offset(f2) => Some(f1.cmp(f2)),
+ Suffix { .. } => None,
+ },
+ Suffix(b1) => match other {
+ Suffix(b2) => Some(b2.cmp(b1)),
+ _ => None,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Snafu)]
+pub enum RangeMergeError {
+ #[snafu(display("Ranges could not be merged because exactly one was a
suffix"))]
+ DifferentTypes,
+ #[snafu(display("Ranges could not be merged because they were too far
apart"))]
+ Spread,
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidGetRange {
+ #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B
long"))]
+ SuffixTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range starting at {expected}, resource was
{actual}B long"))]
+ StartTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Wanted range ending at {expected}, resource was {actual}B
long"))]
+ EndTooLarge { expected: usize, actual: usize },
+
+ #[snafu(display("Range started at {start} and ended at {end}"))]
+ Inconsistent { start: usize, end: usize },
+}
+
+impl GetRange {
+ /// Create a range representing the whole resource.
+ pub fn new_whole() -> Self {
+ Self::Offset(0)
+ }
+
+ /// Whether this is an offset.
+ pub fn is_offset(&self) -> bool {
+ match self {
+ GetRange::Offset(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is a range.
+ pub fn is_range(&self) -> bool {
+ match self {
+ GetRange::Bounded(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether this is an offset.
+ pub fn is_suffix(&self) -> bool {
+ match self {
+ GetRange::Suffix(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Whether the range has no bytes in it (i.e. the last byte is before the
first).
+ ///
+ /// [None] if the range is an `Offset` or `Suffix`.
+ /// The response may still be empty.
+ pub fn is_empty(&self) -> Option<bool> {
+ match self {
+ GetRange::Bounded(r) => Some(r.is_empty()),
+ _ => None,
+ }
+ }
+
+ /// Whether the range represents the entire resource (i.e. it is an
`Offset` of 0).
+ ///
+ /// [None] if the range is a `Range` or `Suffix`.
+ /// The response may still be the full resource.
+ pub fn is_whole(&self) -> Option<bool> {
+ match self {
+ GetRange::Offset(first) => Some(first == &0),
+ _ => None,
+ }
+ }
+
+ /// How many bytes the range is requesting.
+ ///
+ /// Note that the server may respond with a different number of bytes,
+ /// depending on the length of the resource and other behaviour.
+ pub fn nbytes(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)),
+ GetRange::Offset(_) => None,
+ GetRange::Suffix(n) => Some(*n),
+ }
+ }
+
+ /// The index of the first byte requested ([None] for suffix).
+ pub fn first_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => Some(r.start),
+ GetRange::Offset(o) => Some(*o),
+ GetRange::Suffix(_) => None,
+ }
+ }
+
+ /// The index of the last byte requested ([None] for offset or suffix).
+ pub fn last_byte(&self) -> Option<usize> {
+ match self {
+ GetRange::Bounded(r) => match r.end {
+ 0 => None,
+ n => Some(n),
+ },
+ GetRange::Offset { .. } => None,
+ GetRange::Suffix { .. } => None,
+ }
+ }
+
+ pub(crate) fn as_range(&self, len: usize) -> Result<Range<usize>,
InvalidGetRange> {
+ match self {
+ Self::Bounded(r) => {
+ if r.start >= len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: r.start,
+ actual: len,
+ })
+ } else if r.end <= r.start {
+ Err(InvalidGetRange::Inconsistent {
+ start: r.start,
+ end: r.end,
+ })
+ } else if r.end >= len {
+ Err(InvalidGetRange::EndTooLarge {
+ expected: r.end,
+ actual: len,
+ })
+ } else {
+ Ok(r.clone())
+ }
+ }
+ Self::Offset(o) => {
+ if o >= &len {
+ Err(InvalidGetRange::StartTooLarge {
+ expected: *o,
+ actual: len,
+ })
+ } else {
+ Ok(*o..len)
+ }
+ }
+ Self::Suffix(n) => {
+ len.checked_sub(*n)
+ .map(|start| start..len)
+ .ok_or(InvalidGetRange::SuffixTooLarge {
+ expected: *n,
+ actual: len,
+ })
+ }
+ }
+ }
+
+ /// Merge two ranges which fall within a certain distance `coalesce` of
each other.
+ ///
+ /// Error if exactly one is a suffix or if the ranges are too far apart.
+ pub(crate) fn try_merge(
+ &self,
+ other: &GetRange,
+ coalesce: usize,
+ ) -> Result<Self, RangeMergeError> {
+ use GetRange::*;
+
+ let (g1, g2) = match self.partial_cmp(other) {
+ None => {
+ // One is a suffix, the other isn't.
+ // This is escapable if one represents the whole resource.
+ if let Some(whole) = self.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ if let Some(whole) = other.is_whole() {
+ if whole {
+ return Ok(GetRange::new_whole());
+ }
+ }
+ return Err(RangeMergeError::DifferentTypes);
+ }
+ Some(o) => match o {
+ std::cmp::Ordering::Greater => (other, self),
+ _ => (self, other),
+ },
+ };
+
+ match g1 {
+ Bounded(r1) => match g2 {
+ Bounded(r2) => {
+ if r2.start <= r1.end.saturating_add(coalesce) {
+ Ok(GetRange::Bounded(r1.start..r1.end.max(r2.end)))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Offset(first) => {
+ if first < &(r1.end.saturating_add(coalesce)) {
+ Ok(GetRange::Offset(r1.start))
+ } else {
+ Err(RangeMergeError::Spread)
+ }
+ }
+ Suffix { .. } => unreachable!(),
+ },
+ // Either an offset or suffix, both of which would contain the
second range.
+ _ => Ok(g1.clone()),
+ }
+ }
+
+ pub fn matches_range(&self, range: Range<usize>, len: usize) -> bool {
+ match self {
+ Self::Bounded(r) => r == &range,
+ Self::Offset(o) => o == &range.start && len == range.end,
+ Self::Suffix(n) => range.end == len && range.start == len - n,
+ }
+ }
+}
+
+/// Returns a sorted [Vec] of [HttpRange::Offset] and [HttpRange::Range] that
cover `ranges`,
+/// and a single [HttpRange::Suffix] if one or more are given.
+/// The suffix is also omitted if any of the ranges is the whole resource
(`0-`).
+///
+/// The suffix is returned separately as it may still overlap with the other
ranges,
+/// so the caller may want to handle it differently.
+pub fn merge_get_ranges<T: Into<GetRange> + Clone>(
+ ranges: &[T],
+ coalesce: usize,
+) -> (Vec<GetRange>, Option<GetRange>) {
+ if ranges.is_empty() {
+ return (vec![], None);
+ }
+ let mut v = Vec::with_capacity(ranges.len());
+ let mut o = None::<usize>;
+
+ for rng in ranges.iter().cloned().map(|r| r.into()) {
+ match rng {
+ GetRange::Suffix(n) => {
+ if let Some(suff) = o {
+ o = Some(suff.max(n));
+ } else {
+ o = Some(n);
+ }
+ }
+ _ => v.push(rng),
+ }
+ }
+
+ let mut ret = Vec::with_capacity(v.len() + 1);
+ let suff = o.map(|s| GetRange::Suffix(s));
+
+ if v.is_empty() {
+ if let Some(s) = suff.as_ref() {
+ ret.push(s.clone());
+ }
+ return (ret, suff);
+ }
+
+ // unwrap is fine because we've already filtered out the suffixes
+ v.sort_by(|a, b| a.partial_cmp(b).unwrap());
+ let mut it = v.into_iter();
+ let mut prev = it.next().unwrap();
+
+ for curr in it {
+ match prev {
+ GetRange::Offset { .. } => {
+ match prev.try_merge(&curr, coalesce) {
+ Ok(r) => ret.push(r),
+ Err(_) => {
+ ret.push(prev);
+ ret.push(curr);
+ }
+ }
+
+ let Some(s) = suff else {
+ return (ret, None);
+ };
+
+ if ret.last().unwrap().is_whole().unwrap() {
+ return (ret, None);
+ } else {
+ return (ret, Some(s));
+ }
+ }
+ GetRange::Bounded { .. } => match prev.try_merge(&curr, coalesce) {
+ Ok(r) => ret.push(r),
+ Err(_) => {
+ ret.push(prev);
+ ret.push(curr.clone());
+ }
+ },
+ GetRange::Suffix { .. } => unreachable!(),
+ }
+ prev = curr;
+ }
+
+ (ret, suff)
+}
+
+impl Display for GetRange {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ GetRange::Bounded(r) => f.write_fmt(format_args!("{}-{}", r.start,
r.end - 1)),
+ GetRange::Offset(o) => f.write_fmt(format_args!("{o}-")),
+ GetRange::Suffix(n) => f.write_fmt(format_args!("-{n}")),
+ }
+ }
+}
+
+impl<T: RangeBounds<usize>> From<T> for GetRange {
+ fn from(value: T) -> Self {
+ use std::ops::Bound::*;
+ let first = match value.start_bound() {
+ Included(i) => *i,
+ Excluded(i) => i + 1,
+ Unbounded => 0,
+ };
+ match value.end_bound() {
+ Included(i) => GetRange::Bounded(first..(i + 1)),
+ Excluded(i) => GetRange::Bounded(first..*i),
+ Unbounded => GetRange::Offset(first),
+ }
+ }
+}
+
+/// Takes a function `fetch` that can fetch a range of bytes and uses this to
+/// fetch the provided byte `ranges`
+///
+/// To improve performance it will:
+///
+/// * Combine ranges less than `coalesce` bytes apart into a single call to
`fetch`
+/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
+pub async fn coalesce_get_ranges<F, E, Fut>(
+ ranges: &[GetRange],
+ fetch: F,
+ coalesce: usize,
+) -> Result<Vec<Bytes>, E>
+where
+ F: Send + FnMut(GetRange) -> Fut,
+ E: Send,
+ Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
+{
+ let (mut fetch_ranges, suff_opt) = merge_get_ranges(ranges, coalesce);
+ if let Some(suff) = suff_opt.as_ref() {
+ fetch_ranges.push(suff.clone());
+ }
+ if fetch_ranges.is_empty() {
+ return Ok(vec![]);
+ }
+
+ let mut fetched: Vec<_> =
futures::stream::iter(fetch_ranges.iter().cloned())
+ .map(fetch)
+ .buffered(OBJECT_STORE_COALESCE_PARALLEL)
+ .try_collect()
+ .await?;
+
+ let suff = suff_opt.map(|r| {
+ let nbytes = match r {
+ GetRange::Suffix(n) => n,
+ _ => unreachable!(),
+ };
+ fetch_ranges.pop().unwrap();
+ let b = fetched.pop().unwrap();
+ if nbytes >= b.len() {
+ b
+ } else {
+ b.slice((b.len() - nbytes)..)
+ }
+ });
+
+ Ok(ranges
+ .iter()
+ .map(|range| {
+ match range {
+ GetRange::Suffix(n) => {
+ let b = suff.as_ref().unwrap();
+ let start = b.len().saturating_sub(*n);
+ return b.slice(start..b.len());
+ }
+ _ => (),
+ }
+ // unwrapping range.first_byte() is fine because we've
early-returned suffixes
+ let idx = fetch_ranges
+ .partition_point(|v| v.first_byte().unwrap() <=
range.first_byte().unwrap())
+ - 1;
+ let fetch_range = &fetch_ranges[idx];
+ let fetch_bytes = &fetched[idx];
+
+ let start = range.first_byte().unwrap() -
fetch_range.first_byte().unwrap();
+ let end = range.last_byte().map_or(fetch_bytes.len(), |range_last|
{
+ fetch_bytes
+ .len()
+ .max(range_last - fetch_range.first_byte().unwrap() + 1)
+ });
+ fetch_bytes.slice(start..end)
+ })
+ .collect())
+}
+
+#[derive(Debug, Snafu)]
+pub enum InvalidRangeResponse {
+ #[snafu(display("Response was not PARTIAL_CONTENT; length {length:?}"))]
+ NotPartial { length: Option<usize> },
+ #[snafu(display("Content-Range header not present"))]
+ NoContentRange,
+ #[snafu(display("Content-Range header could not be parsed: {value:?}"))]
+ InvalidContentRange { value: Vec<u8> },
+}
+
+pub(crate) fn response_range(r: &Response) -> Result<Range<usize>,
InvalidRangeResponse> {
+ use InvalidRangeResponse::*;
+
+ if r.status() != StatusCode::PARTIAL_CONTENT {
+ return Err(NotPartial {
+ length: r.content_length().map(|s| s as usize),
+ });
+ }
+
+ let val_bytes = r
+ .headers()
+ .get(CONTENT_RANGE)
+ .ok_or(NoContentRange)?
+ .as_bytes();
+
+ match ContentRange::parse_bytes(val_bytes) {
Review Comment:
The format seems extremely simple -
https://www.rfc-editor.org/rfc/rfc9110#field.content-range
I therefore wonder if we can avoid needing another dependency?
Possibly something like (not tested)
```
fn parse_content_range(s: &str) -> Option<Range<usize>> {
let rem = s.strip_prefix("bytes ")?;
let (range, _) = rem.split_once('/')?;
let (start, end) = range.split_once('-')?;
Some((start.try_into().ok()?, end.try_into().ok()?))
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]