mingmwang commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r985173962
##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -250,48 +261,72 @@ fn send_fetch_partitions(
AbortableReceiverStream::create(response_receiver, join_handles)
}
-#[cfg(not(test))]
-async fn fetch_partition(
- location: &PartitionLocation,
-) -> Result<SendableRecordBatchStream> {
- let metadata = &location.executor_meta;
- let partition_id = &location.partition_id;
- // TODO for shuffle client connections, we should avoid creating new
connections again and again.
- // And we should also avoid to keep alive too many connections for long
time.
- let host = metadata.host.as_str();
- let port = metadata.port as u16;
- let mut ballista_client = BallistaClient::try_new(host, port)
- .await
- .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
- ballista_client
- .fetch_partition(
- &partition_id.job_id,
- partition_id.stage_id as usize,
- partition_id.partition_id as usize,
- &location.path,
- host,
- port,
- )
- .await
- .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))
+/// Partition reader Trait, different partition reader can have
+#[async_trait]
+trait PartitionReader: Send + Sync + Clone {
+ // Read partition data from PartitionLocation
+ async fn fetch_partition(
+ &self,
+ location: &PartitionLocation,
+ ) -> result::Result<SendableRecordBatchStream, BallistaError>;
}
-#[cfg(test)]
-async fn fetch_partition(
- location: &PartitionLocation,
-) -> Result<SendableRecordBatchStream> {
- tests::fetch_test_partition(location)
+#[derive(Clone)]
+struct FlightPartitionReader {}
+
+#[async_trait]
+impl PartitionReader for FlightPartitionReader {
+ async fn fetch_partition(
+ &self,
+ location: &PartitionLocation,
+ ) -> result::Result<SendableRecordBatchStream, BallistaError> {
+ let metadata = &location.executor_meta;
+ let partition_id = &location.partition_id;
+ // TODO for shuffle client connections, we should avoid creating new
connections again and again.
+ // And we should also avoid to keep alive too many connections for
long time.
+ let host = metadata.host.as_str();
+ let port = metadata.port as u16;
+ let mut ballista_client =
+ BallistaClient::try_new(host, port)
+ .await
+ .map_err(|error| match error {
+ // map grpc connection error to partition fetch error.
+ BallistaError::GrpcConnectionError(msg) => {
+ BallistaError::FetchFailed(
+ metadata.id.clone(),
+ partition_id.stage_id,
+ partition_id.partition_id,
+ msg,
+ )
+ }
+ other => other,
+ })?;
+
+ ballista_client
+ .fetch_partition(&metadata.id, partition_id, &location.path, host,
port)
+ .await
+ }
}
+#[allow(dead_code)]
+// TODO
+struct LocalPartitionReader {}
Review Comment:
> Nice! I assume where this is going is making the executors smart enough to
read partitions on the same physical machine directly from disk.
Yes, exactly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]