alamb opened a new issue, #15067:
URL: https://github.com/apache/datafusion/issues/15067

   ### Describe the bug
   
   Related to
   - https://github.com/apache/datafusion/issues/12393
   - https://github.com/apache/datafusion/pull/14286
   - https://github.com/delta-io/delta-rs/issues/2595
   
   Basically, when I just try to read one of the ClickBench parquet files 
directly from remote object store (see example below) on a slow internet 
connection I get the following error
   
   > called `Result::unwrap()` on an `Err` value: 
ArrowError(ExternalError(External(Generic { store: "HTTP", source: 
reqwest::Error { kind: Decode, source: reqwest::Error { kind: Body, source: 
TimedOut } } })), None)
   
   My example just reads the data back (it is not doing any CPU intensive 
processing).
   
   This is very similar to the reports @ion-elgreco has fielded in delta.rs
   - https://github.com/delta-io/delta-rs/issues/2595
   
   
   
   ### To Reproduce
   
   Run this program that just tries to read the file (on a crappy internet 
connection)
   
   ```rust
   
   #[tokio::main]
   async fn main() -> datafusion::error::Result<()> {
       let ctx = SessionContext::new();
       let object_store_url = 
ObjectStoreUrl::parse("https://datasets.clickhouse.com";).unwrap();
       let object_store = object_store::http::HttpBuilder::new()
           .with_url(object_store_url.as_str())
           .build()
           .unwrap();
   
       let max_request_size = 1*1024*1024; // 1MB
       //let object_store = 
LimitedRequestSizeObjectStore::new(Arc::new(object_store), max_request_size);
       //let object_store= ReportingObjectStore::new(Arc::new(object_store));
   
       ctx.register_object_store(object_store_url.as_ref(), 
Arc::new(object_store));
   
       let start = Instant::now();
       let options = ParquetReadOptions::new();
       let df = ctx.read_parquet(
           
"https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet";,
           options,
   
       )
           .await.unwrap();
       println!("Created table and plan in {:?}", Instant::now() - start);
       df.clone().explain(false, false).unwrap().show().await.unwrap();
   
       println!("running query");
       let start = Instant::now();
       let mut stream = df.execute_stream().await.unwrap();
       let mut total_rows = 0;
       let mut total_batches = 0;
       while let Some(batch) = stream.next().await {
           let batch = batch.unwrap();
           total_rows += batch.num_rows();
           total_batches += 1;
           println!("{:?} batches: {total_batches} rows: {total_rows}", 
Instant::now() - start);
       }
   
       println!("DOne");
       Ok(())
   }
   ```
   
   This results in the following output:
   ```
   running query
   3.753874792s batches: 1 rows: 8192
   3.756978125s batches: 2 rows: 16384
   3.761664292s batches: 3 rows: 24576
   3.765139625s batches: 4 rows: 32768
   3.768803333s batches: 5 rows: 40960
   3.771689042s batches: 6 rows: 49152
   3.773665958s batches: 7 rows: 57344
   3.774663333s batches: 8 rows: 62734
   
   thread 'main' panicked at src/main.rs:59:27:
   called `Result::unwrap()` on an `Err` value: 
ArrowError(ExternalError(External(Generic { store: "HTTP", source: 
reqwest::Error { kind: Decode, source: reqwest::Error { kind: Body, source: 
TimedOut } } })), None)
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   ```
   
   
   ### Expected behavior
   
   I expect the query to complete without error
   
   ### Additional context
   
   When I added an `ObjectStore` wrapper that reported what requests were being
   made to the underlying storage system I found that DataFusion is making 
single
   "large" request for 93MB. Given the bandwidth of the coffee shop wifi, this
   request can not be completed within the default 30 second connection timeout.
   
   Thus the request times out and the error is returned to the client
   
   I was able to make the query work by writing another ObjectStore wrapper that
   split the single 93MB request into multiple smaller requests and then my 
program completes.
   
   
   
   <details><summary>Click here to see the idea (horrible code, I am 
sorry)</summary>
   <p>
   
   ```rust
   use std::collections::VecDeque;
   use std::fmt::Display;
   use std::ops::Range;
   use std::sync::Arc;
   use std::sync::atomic::AtomicUsize;
   use bytes::Bytes;
   use futures_util::stream::BoxStream;
   use object_store::{GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult};
   use object_store::path::Path;
   
   /// This is an ObjectStore wrapper that limits the sizes of individual 
requests to some size
   ///
   /// This is mostly useful for connections that have limited bandwidth so 
that the requests complete
   /// before a timeout is reached.
   ///
   /// This requires more requests, but each request is smaller and more likely 
to
   /// complete within the timeout
   ///
   /// For example, if the timeout is 30 seconds and the bandwidth is 1MB/s,
   /// a single request for 100MB would take around 100s,. exceeding the 
timeout.
   ///
   #[derive(Debug)]
   pub struct LimitedRequestSizeObjectStore {
       inner: Arc<dyn ObjectStore>,
       maximum_request_size: usize,
       next_id: AtomicUsize,
   }
   
   impl Display for LimitedRequestSizeObjectStore {
       fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
           write!(f, "LimitedRequestSizeObjectStore({}, 
maximum_request_size={})", self.inner, self.maximum_request_size)
       }
   }
   
   impl LimitedRequestSizeObjectStore {
       pub fn new(inner: Arc<dyn ObjectStore>, maximum_request_size: usize) -> 
Self {
           // start at 1000000 to make it easy to distinguish from the other 
object stores
           let next_id = AtomicUsize::new(1000000);
           Self { inner, maximum_request_size, next_id }
       }
   
       /// Splits a Vec of ranges into a list of ranges so that each range is 
no larger than `maximum_request_size`
       fn split_ranges(&self, ranges: &[Range<usize>]) -> 
Vec<Vec<Range<usize>>> {
           println!("input split ranges {:?}", ranges);
           let mut remaining_ranges = 
ranges.iter().cloned().collect::<VecDeque<_>>();
           let mut split_ranges = vec![];
           let mut current_ranges = vec![];
           let mut remaining_bytes = self.maximum_request_size;
           while let Some(range) = remaining_ranges.pop_front() {
               // if there is space left in the current output range, take the 
whole range
               if range.len() < remaining_bytes {
                   remaining_bytes -= range.len();
                   current_ranges.push(range);
               }
               // otherwise, need to split the range and put it back on for 
next time
               else {
                   let start_range = range.start..range.start+remaining_bytes;
                   let end_range = range.start+remaining_bytes..range.end;
                   current_ranges.push(start_range);
                   remaining_ranges.push_front(end_range);
   
                   // current ranges is full, so add it to the output and start 
a new one
                   assert_eq!(current_ranges.iter().map(|r| 
r.len()).sum::<usize>(), self.maximum_request_size);
                   split_ranges.push(current_ranges);
                   current_ranges = vec![];
                   remaining_bytes = self.maximum_request_size;
               }
           }
           if !current_ranges.is_empty() {
               split_ranges.push(current_ranges);
           }
           println!("output split ranges {:?}", split_ranges);
           split_ranges
       }
   
   }
   
   
   
   #[async_trait::async_trait]
   impl ObjectStore for LimitedRequestSizeObjectStore {
       async fn put(&self, location: &Path, payload: PutPayload) -> 
object_store::Result<PutResult> {
           self.inner.put(location, payload).await
       }
   
       async fn put_opts(&self, location: &Path, payload: PutPayload, opts: 
PutOptions) -> object_store::Result<PutResult> {
           self.inner.put_opts(location, payload, opts).await
       }
   
       async fn put_multipart(&self, location: &Path) -> 
object_store::Result<Box<dyn MultipartUpload>> {
           self.inner.put_multipart(location).await
       }
   
       async fn put_multipart_opts(&self, location: &Path, opts: 
PutMultipartOpts) -> object_store::Result<Box<dyn MultipartUpload>> {
           self.inner.put_multipart_opts(location, opts).await
       }
   
       async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
           println!("LimitedRequestSizeObjectStore begin get: {}", location);
           let result = self.inner.get(location).await?;
           println!("LimitedRequestSizeObjectStore get: Read {} bytes from {}", 
result.meta.size, result.meta.location);
           Ok(result)
       }
   
       async fn get_opts(&self, location: &Path, options: GetOptions) -> 
object_store::Result<GetResult> {
           println!("LimitedRequestSizeObjectStore begin get: {} ({:?})", 
location, options);
           let result = self.inner.get_opts(location, options).await?;
           println!("LimitedRequestSizeObjectStore get: Read {} bytes from {}", 
result.meta.size, result.meta.location);
           Ok(result)
       }
   
       async fn get_range(&self, location: &Path, range: Range<usize>) -> 
object_store::Result<Bytes> {
           let id = self.next_id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
           let total_bytes = range.len();
           let mbytes = total_bytes/ 1024 / 1024;
           println!("LimitedRequestSizeObjectStore id={id} get_range: {} 
{mbytes}MB {total_bytes} bytes", location);
   
           let mut res = self.get_ranges(location, &[range]).await?;
           println!("LimitedRequestSizeObjectStore id={id} get_range complete");
           assert_eq!(res.len(), 1);
           Ok(res.pop().unwrap())
       }
   
       async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> 
object_store::Result<Vec<Bytes>> {
           let id = self.next_id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
           let total_bytes: usize = ranges.iter().map(|r| r.len()).sum();
           let mbytes = total_bytes/ 1024 / 1024;
           println!("LimitedRequestSizeObjectStore id={id} get_ranges: {} 
{mbytes}MB {total_bytes} bytes", location);
   
           let split_ranges = self.split_ranges(ranges);
           println!("  Have {} split ranges", split_ranges.len());
   
           // perform a request for each range and put them back together at 
the end
           let mut overall_result = Vec::with_capacity(total_bytes);
           for split_range in split_ranges {
               let total_range_bytes: usize = split_range.iter().map(|r| 
r.len()).sum();
               println!("  requesting {} inner ranges {total_range_bytes} 
bytes", split_range.len());
               let responses = self.inner.get_ranges(location, 
&split_range).await?;
               for response in responses {
                   overall_result.extend(response.as_ref());
               }
           }
           assert_eq!(overall_result.len(), total_bytes);
   
           // convert it to Bytes and slice it  up into the original ranges
           let overall_result = Bytes::from(overall_result);
           let mut result_ranges = vec![];
           let mut offeset = 0; //
           for range in ranges {
               let slice = overall_result.slice(offeset..offeset+range.len());
               offeset += range.len();
               result_ranges.push(slice);
           }
           println!("LimitedRequestSizeObjectStore id={id} get_ranges 
complete");
           Ok(result_ranges)
       }
   
       async fn head(&self, location: &Path) -> 
object_store::Result<ObjectMeta> {
           self.inner.head(location).await
       }
   
       async fn delete(&self, location: &Path) -> object_store::Result<()> {
           self.inner.delete(location).await
       }
   
       fn delete_stream<'a>(&'a self, locations: BoxStream<'a, 
object_store::Result<Path>>) -> BoxStream<'a, object_store::Result<Path>> {
           self.inner.delete_stream(locations)
       }
   
       fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, 
object_store::Result<ObjectMeta>> {
           self.inner.list(prefix)
       }
   
       fn list_with_offset(&self, prefix: Option<&Path>, offset: &Path) -> 
BoxStream<'_, object_store::Result<ObjectMeta>> {
           self.inner.list_with_offset(prefix, offset)
       }
   
       async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
object_store::Result<ListResult> {
           self.inner.list_with_delimiter(prefix).await
       }
   
       async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> 
{
           self.inner.copy(from, to).await
       }
   
       async fn rename(&self, from: &Path, to: &Path) -> 
object_store::Result<()> {
           self.inner.rename(from, to).await
       }
   
       async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> 
object_store::Result<()> {
           self.inner.copy_if_not_exists(from, to).await
       }
   
       async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> 
object_store::Result<()> {
           self.inner.rename_if_not_exists(from, to).await
       }
   }
   
   
   
   ```
   
   </p>
   </details> 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to