alamb commented on code in PR #17364:
URL: https://github.com/apache/datafusion/pull/17364#discussion_r2325838608


##########
datafusion/datasource/src/url.rs:
##########
@@ -242,33 +242,64 @@ impl ListingTableUrl {
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         let exec_options = &ctx.config_options().execution;
         let ignore_subdirectory = 
exec_options.listing_table_ignore_subdirectory;
-        // If the prefix is a file, use a head request, otherwise list
-        let list = match self.is_collection() {
-            true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
-                None => store.list(Some(&self.prefix)),
+
+        async fn list_with_cache<'b>(
+            ctx: &'b dyn Session,
+            store: &'b dyn ObjectStore,
+            prefix: &'b Path,
+        ) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
+            match ctx.runtime_env().cache_manager.get_list_files_cache() {
+                None => Ok(store
+                    .list(Some(prefix))
+                    .map(|res| res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e))))

Review Comment:
   What is the reason for the map and then map err? I think you can express the 
same thing like
   
   ```suggestion
                       .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
   ```



##########
datafusion/datasource/src/url.rs:
##########
@@ -242,33 +242,64 @@ impl ListingTableUrl {
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         let exec_options = &ctx.config_options().execution;
         let ignore_subdirectory = 
exec_options.listing_table_ignore_subdirectory;
-        // If the prefix is a file, use a head request, otherwise list
-        let list = match self.is_collection() {
-            true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
-                None => store.list(Some(&self.prefix)),
+
+        async fn list_with_cache<'b>(

Review Comment:
   is there any reason for this function to be defined inside an existing 
function? I think it would be clearer and easier to read (less indented) f you 
could move this function to its own method



##########
datafusion/datasource/src/url.rs:
##########
@@ -242,33 +242,64 @@ impl ListingTableUrl {
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         let exec_options = &ctx.config_options().execution;
         let ignore_subdirectory = 
exec_options.listing_table_ignore_subdirectory;
-        // If the prefix is a file, use a head request, otherwise list
-        let list = match self.is_collection() {
-            true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
-                None => store.list(Some(&self.prefix)),
+
+        async fn list_with_cache<'b>(
+            ctx: &'b dyn Session,
+            store: &'b dyn ObjectStore,
+            prefix: &'b Path,
+        ) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
+            match ctx.runtime_env().cache_manager.get_list_files_cache() {
+                None => Ok(store
+                    .list(Some(prefix))
+                    .map(|res| res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e))))
+                    .boxed()),
                 Some(cache) => {
-                    if let Some(res) = cache.get(&self.prefix) {
+                    if let Some(res) = cache.get(prefix) {
                         debug!("Hit list all files cache");
-                        
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
-                            .boxed()
+                        Ok(futures::stream::iter(
+                            res.as_ref().clone().into_iter().map(Ok),
+                        )
+                        .map(|res| {
+                            res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e)))
+                        })
+                        .boxed())
                     } else {
-                        let list_res = store.list(Some(&self.prefix));
-                        let vec = 
list_res.try_collect::<Vec<ObjectMeta>>().await?;
-                        cache.put(&self.prefix, Arc::new(vec.clone()));
-                        futures::stream::iter(vec.into_iter().map(Ok)).boxed()
+                        let vec = store
+                            .list(Some(prefix))
+                            .map(|res| {
+                                res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e)))
+                            })
+                            .try_collect::<Vec<ObjectMeta>>()
+                            .await?;
+                        cache.put(prefix, Arc::new(vec.clone()));
+                        Ok(futures::stream::iter(vec.into_iter().map(Ok))
+                            .map(|res| {
+                                res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e)))
+                            })
+                            .boxed())
                     }
                 }
-            },
-            false => futures::stream::once(store.head(&self.prefix)).boxed(),
+            }
+        }
+
+        let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
+            list_with_cache(ctx, store, &self.prefix).await?
+        } else {
+            match store.head(&self.prefix).await {
+                Ok(meta) => futures::stream::once(async { Ok(meta) })
+                    .map(|res| res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e))))
+                    .boxed(),
+                Err(_) => list_with_cache(ctx, store, &self.prefix).await?,

Review Comment:
   Can we please add a comment here that explains the rationale for retry here. 
Something like 
   
   ```suggestion
                   // If the head command fails, object doesn't exist. Retry as 
though
                   // it were a prefix with LIST
                   Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
   ```
   
   Also, I wonder if we should only retry on `Not found` errors, -- as it is 
this code retries on everything



##########
datafusion/datasource/src/url.rs:
##########
@@ -242,33 +242,64 @@ impl ListingTableUrl {
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         let exec_options = &ctx.config_options().execution;
         let ignore_subdirectory = 
exec_options.listing_table_ignore_subdirectory;
-        // If the prefix is a file, use a head request, otherwise list
-        let list = match self.is_collection() {
-            true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
-                None => store.list(Some(&self.prefix)),
+
+        async fn list_with_cache<'b>(

Review Comment:
   I played around with this function locally and think we can can make it much 
simpler and thus does not obscure the logic quite as much
   
   ```rust
   async fn list_with_cache<'b>(
       ctx: &'b dyn Session,
       store: &'b dyn ObjectStore,
       prefix: &'b Path,
   ) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
       match ctx.runtime_env().cache_manager.get_list_files_cache() {
           None => Ok(store
               .list(Some(prefix))
               .map(|res| res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e))))
               .boxed()),
           Some(cache) => {
               let vec = if let Some(res) = cache.get(prefix) {
                   debug!("Hit list all files cache");
                   res.as_ref().clone()
               } else {
                   let vec = store
                       .list(Some(prefix))
                       .try_collect::<Vec<ObjectMeta>>()
                       .await?;
                   cache.put(prefix, Arc::new(vec.clone()));
                   vec
               };
               Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
           }
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to