yahoNanJing commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r983141897
##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -252,48 +261,71 @@ fn send_fetch_partitions(
AbortableReceiverStream::create(response_receiver, join_handles)
}
-#[cfg(not(test))]
-async fn fetch_partition(
- location: &PartitionLocation,
-) -> result::Result<SendableRecordBatchStream, BallistaError> {
- let metadata = &location.executor_meta;
- let partition_id = &location.partition_id;
- // TODO for shuffle client connections, we should avoid creating new
connections again and again.
- // And we should also avoid to keep alive too many connections for long
time.
- let host = metadata.host.as_str();
- let port = metadata.port as u16;
- let mut ballista_client =
- BallistaClient::try_new(host, port)
- .await
- .map_err(|error| match error {
- // map grpc connection error to partition fetch error.
- BallistaError::GrpcConnectionError(msg) =>
BallistaError::FetchFailed(
- metadata.id.clone(),
- partition_id.stage_id,
- partition_id.partition_id,
- msg,
- ),
- other => other,
- })?;
-
- ballista_client
- .fetch_partition(&metadata.id, partition_id, &location.path, host,
port)
- .await
+/// Partition reader Trait, different partition reader can have
+#[async_trait]
+trait PartitionReader: Send + Sync + Clone {
Review Comment:
Trait is much more extensible than conditional compiling 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]