milenkovicm commented on code in PR #1503:
URL: 
https://github.com/apache/datafusion-ballista/pull/1503#discussion_r2929632106


##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -427,46 +414,44 @@ fn send_fetch_partitions(
 
     // keep local shuffle files reading in serial order for memory control.
     let response_sender_c = response_sender.clone();
-    let customize_endpoint_c = customize_endpoint.clone();
-    spawned_tasks.push(SpawnedTask::spawn(async move {
-        for p in local_locations {
-            let r = PartitionReaderEnum::Local
-                .fetch_partition(
-                    &p,
-                    max_message_size,
-                    flight_transport,
-                    customize_endpoint_c.clone(),
-                    use_tls,
-                )
-                .await;
-            if let Err(e) = response_sender_c.send(r).await {
-                error!("Fail to send response event to the channel due to 
{e}");
+
+    //
+    // fetching local partitions (read from file)
+    //
+
+    spawned_tasks.push(SpawnedTask::spawn_blocking({
+        move || {
+            for p in local_locations {
+                let r = fetch_partition_local(&p);
+                if let Err(e) = response_sender_c.blocking_send(r) {
+                    error!("Fail to send response event to the channel due to 
{e}");
+                }
             }
         }
     }));
 
+    //
+    // fetching remote partitions (uses grpc flight protocol)
+    //
     for p in remote_locations.into_iter() {
         let semaphore = semaphore.clone();
         let response_sender = response_sender.clone();
-        let customize_endpoint_c = customize_endpoint.clone();
-        spawned_tasks.push(SpawnedTask::spawn(async move {
-            // Block if exceeds max request number.
-            let permit = semaphore.acquire_owned().await.unwrap();
-            let r = PartitionReaderEnum::FlightRemote
-                .fetch_partition(
-                    &p,
-                    max_message_size,
-                    flight_transport,
-                    customize_endpoint_c,
-                    use_tls,
-                )
-                .await;
-            // Block if the channel buffer is full.
-            if let Err(e) = response_sender.send(r).await {
-                error!("Fail to send response event to the channel due to 
{e}");
+
+        spawned_tasks.push(SpawnedTask::spawn({
+            // TODO: make BallistaConfig cheaper to clone

Review Comment:
   Actually, there is `GrpcClientConfig` which should have been used here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to