thinkharderdev commented on code in PR #261:
URL: https://github.com/apache/arrow-ballista/pull/261#discussion_r985138176


##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -110,6 +114,20 @@ pub struct ExecutionGraph {
     output_partitions: usize,
     /// Locations of this `ExecutionGraph` final output locations
     output_locations: Vec<PartitionLocation>,
+    /// Task ID generator, generate unique TID in the execution graph
+    task_id_gen: usize,
+    /// Failed stage attempts, record the failed stage attempts to limit the 
retry times.
+    /// Map from Stage ID -> Set<Stage_ATTPMPT_NUM>
+    failed_stage_attempts: HashMap<usize, HashSet<usize>>,

Review Comment:
   I don't get why we are saving a `HashSet` of attempts. Shouldn't it just be 
`usize`?



##########
ballista/rust/core/src/error.rs:
##########
@@ -182,13 +200,78 @@ impl Display for BallistaError {
             // }
             BallistaError::TonicError(desc) => write!(f, "Tonic error: {}", 
desc),
             BallistaError::GrpcError(desc) => write!(f, "Grpc error: {}", 
desc),
+            BallistaError::GrpcConnectionError(desc) => {
+                write!(f, "Grpc connection error: {}", desc)
+            }
             BallistaError::Internal(desc) => {
                 write!(f, "Internal Ballista error: {}", desc)
             }
             BallistaError::TokioError(desc) => write!(f, "Tokio join error: 
{}", desc),
+            BallistaError::GrpcActionError(desc) => {
+                write!(f, "Grpc Execute Action error: {}", desc)
+            }
+            BallistaError::FetchFailed(executor_id, map_stage, map_partition, 
desc) => {
+                write!(
+                    f,
+                    "Shuffle fetch partition error from Executor {}, map_stage 
{}, \
+                map_partition {}, error desc: {}",
+                    executor_id, map_stage, map_partition, desc
+                )
+            }
             BallistaError::Cancelled => write!(f, "Task cancelled"),
         }
     }
 }
 
+impl From<BallistaError> for FailedTask {
+    fn from(e: BallistaError) -> Self {
+        match e {
+            BallistaError::FetchFailed(
+                executor_id,
+                map_stage_id,
+                map_partition_id,
+                desc,
+            ) => {
+                FailedTask {
+                    error: desc,
+                    // fetch partition error is considered to be non-retryable
+                    retryable: false,
+                    count_to_failures: false,
+                    failed_reason: Some(FailedReason::FetchPartitionError(
+                        FetchPartitionError {
+                            executor_id,
+                            map_stage_id: map_stage_id as u32,
+                            map_partition_id: map_partition_id as u32,
+                        },
+                    )),
+                }
+            }
+            BallistaError::IoError(io) => {
+                FailedTask {
+                    error: format!("Task failed due to Ballista IO error: 
{:?}", io),
+                    // IO error is considered to be temporary and retryable
+                    retryable: true,
+                    count_to_failures: true,
+                    failed_reason: Some(FailedReason::IoError(IoError {})),
+                }
+            }
+            BallistaError::DataFusionError(DataFusionError::IoError(io)) => {
+                FailedTask {
+                    error: format!("Task failed due to DataFusion IO error: 
{:?}", io),
+                    // IO error is considered to be temporary and retryable
+                    retryable: true,
+                    count_to_failures: true,

Review Comment:
   I'm not sure I understand `retyrable` and `count_to_failures`. If a task is 
retry-able wouldn't wouldn't it count to failures in all case. Conversely if 
it's not retry-able then we don't need to count the failure at all. 



##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -250,48 +261,72 @@ fn send_fetch_partitions(
     AbortableReceiverStream::create(response_receiver, join_handles)
 }
 
-#[cfg(not(test))]
-async fn fetch_partition(
-    location: &PartitionLocation,
-) -> Result<SendableRecordBatchStream> {
-    let metadata = &location.executor_meta;
-    let partition_id = &location.partition_id;
-    // TODO for shuffle client connections, we should avoid creating new 
connections again and again.
-    // And we should also avoid to keep alive too many connections for long 
time.
-    let host = metadata.host.as_str();
-    let port = metadata.port as u16;
-    let mut ballista_client = BallistaClient::try_new(host, port)
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-    ballista_client
-        .fetch_partition(
-            &partition_id.job_id,
-            partition_id.stage_id as usize,
-            partition_id.partition_id as usize,
-            &location.path,
-            host,
-            port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))
+/// Partition reader Trait, different partition reader can have
+#[async_trait]
+trait PartitionReader: Send + Sync + Clone {
+    // Read partition data from PartitionLocation
+    async fn fetch_partition(
+        &self,
+        location: &PartitionLocation,
+    ) -> result::Result<SendableRecordBatchStream, BallistaError>;
 }
 
-#[cfg(test)]
-async fn fetch_partition(
-    location: &PartitionLocation,
-) -> Result<SendableRecordBatchStream> {
-    tests::fetch_test_partition(location)
+#[derive(Clone)]
+struct FlightPartitionReader {}
+
+#[async_trait]
+impl PartitionReader for FlightPartitionReader {
+    async fn fetch_partition(
+        &self,
+        location: &PartitionLocation,
+    ) -> result::Result<SendableRecordBatchStream, BallistaError> {
+        let metadata = &location.executor_meta;
+        let partition_id = &location.partition_id;
+        // TODO for shuffle client connections, we should avoid creating new 
connections again and again.
+        // And we should also avoid to keep alive too many connections for 
long time.
+        let host = metadata.host.as_str();
+        let port = metadata.port as u16;
+        let mut ballista_client =
+            BallistaClient::try_new(host, port)
+                .await
+                .map_err(|error| match error {
+                    // map grpc connection error to partition fetch error.
+                    BallistaError::GrpcConnectionError(msg) => {
+                        BallistaError::FetchFailed(
+                            metadata.id.clone(),
+                            partition_id.stage_id,
+                            partition_id.partition_id,
+                            msg,
+                        )
+                    }
+                    other => other,
+                })?;
+
+        ballista_client
+            .fetch_partition(&metadata.id, partition_id, &location.path, host, 
port)
+            .await
+    }
 }
 
+#[allow(dead_code)]
+// TODO
+struct LocalPartitionReader {}

Review Comment:
   Nice! I assume where this is going is making the executors smart enough to 
read partitions on the same physical machine directly from disk.



##########
ballista/rust/core/proto/ballista.proto:
##########
@@ -664,15 +698,48 @@ message RunningTask {
 
 message FailedTask {
   string error = 1;
+  bool retryable = 2;
+  // Whether this task failure should be counted to the maximum number of 
times the task is allowed to retry
+  bool count_to_failures = 3;
+  oneof failed_reason {
+    ExecutionError execution_error = 4;
+    FetchPartitionError fetch_partition_error = 5;
+    IOError io_error = 6;
+    ExecutorLost executor_lost = 7;
+    // A successful task's result is lost due to executor lost
+    ResultLost result_lost = 8;
+    TaskKilled task_killed = 9;
+  }
 }
 
-message CompletedTask {
+message SuccessfulTask {
   string executor_id = 1;
   // TODO tasks are currently always shuffle writes but this will not always 
be the case
   // so we might want to think about some refactoring of the task definitions
   repeated ShuffleWritePartition partitions = 2;
 }
 
+message ExecutionError {
+}
+
+message FetchPartitionError {
+  string executor_id = 1;
+  uint32 map_stage_id = 2;
+  uint32 map_partition_id = 3;
+}
+
+message IOError {
+}
+
+message ExecutorLost {
+}
+
+message ResultLost {
+}

Review Comment:
   Not sure I understand the difference between these two errors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to