andygrove commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650138952



##########
File path: ballista/rust/client/src/context.rs
##########
@@ -74,32 +71,6 @@ impl BallistaContextState {
     }
 }
 
-struct WrappedStream {
-    stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + 
Sync>>,
-    schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-impl Stream for WrappedStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        self.stream.poll_next_unpin(cx)
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
-    }
-}
-

Review comment:
       This is moved to ballista-core utils

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),
+            Arc::new(self.schema.as_ref().clone()),
+        );
+        Ok(Box::pin(result))
     }

Review comment:
       This is the main change

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       Once the basic shuffle mechanism is implemented, there will be a lot of 
optimization work to follow




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to