[ https://issues.apache.org/jira/browse/SPARK-42972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707143#comment-17707143 ]
liang yu edited comment on SPARK-42972 at 3/31/23 7:16 AM: ----------------------------------------------------------- Using structured streaming, when we set the config to use dynamic allocation, there is a bug which will make the program hang. Here is how it happened: {code:scala} // Some comments here private def manageAllocation(): Unit = synchronized { logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]") if (batchProcTimeCount > 0) { val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount val ratio = averageBatchProcTime.toDouble / batchDurationMs //When the ratio is lower than the scalingDownRatio, the client will try to kill executors, but if all executors are dead accidentally, the program will hung, because there is no executors to kill. logInfo(s"Average: $averageBatchProcTime, ratio = $ratio") if (ratio >= scalingUpRatio) { logDebug("Requesting executors") val numNewExecutors = math.max(math.round(ratio).toInt, 1) requestExecutors(numNewExecutors) } else if (ratio <= scalingDownRatio) { logDebug("Killing executors") killExecutor() } } batchProcTimeSum = 0 batchProcTimeCount = 0 //Then there will be no more batch jobs to complete, and batchProcTimeCount will always be 0, the program will stuck in suspended animation. } {code} When the ratio is lowe than the scalingDownRatio, the client will try to kill executors, but if all executors are dead accidentally at the same time, the program will hung, because there is no executors to kill. Then there will be no more batch jobs to complete, and batchProcTimeCount will always be 0, the program will stuck in suspended animation, because last time it tried to kill executors and requestExecutors function will never be triggered was (Author: JIRAUSER299608): Using structured streaming, when we set the config to use dynamic allocation, there is a bug which will make the program hang. Here is how it happened: {code:scala} // Some comments here private def manageAllocation(): Unit = synchronized { logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]") if (batchProcTimeCount > 0) { val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount val ratio = averageBatchProcTime.toDouble / batchDurationMs //When the ratio is lower than the scalingDownRatio, the client will try to kill executors, but if all executors are dead accidentally, the program will hung, because there is no executors to kill. logInfo(s"Average: $averageBatchProcTime, ratio = $ratio") if (ratio >= scalingUpRatio) { logDebug("Requesting executors") val numNewExecutors = math.max(math.round(ratio).toInt, 1) requestExecutors(numNewExecutors) } else if (ratio <= scalingDownRatio) { logDebug("Killing executors") killExecutor() } } batchProcTimeSum = 0 batchProcTimeCount = 0 //Then there will be no more batch jobs to complete, and batchProcTimeCount will always be 0, the program will stuck in suspended animation. } {code} > ExecutorAllocationManager cannot allocate new instances when all executors > down. > -------------------------------------------------------------------------------- > > Key: SPARK-42972 > URL: https://issues.apache.org/jira/browse/SPARK-42972 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.3.2 > Reporter: Jiandan Yang > Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org