This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 521d30ccd25f611226f498324dec07591a44ed2a Author: peacewong <[email protected]> AuthorDate: Tue Oct 10 20:34:51 2023 +0800 optimize hive task progress --- .../hive/src/main/resources/log4j2.xml | 11 +++++- .../hive/executor/HiveEngineConnExecutor.scala | 43 ++++++++++++++++------ 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml index 4d3b5855b..b56efdb36 100644 --- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml +++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml @@ -34,6 +34,10 @@ <Send name="Send" > <Filters> + <RegexFilter regex=".*Hive-on-MR is deprecated in Hive \d+ and may not be available in the future versions\..*" onMatch="DENY" onMismatch="NEUTRAL"/> + <RegexFilter regex=".*Hadoop command-line option parsing not.*" onMatch="DENY" onMismatch="NEUTRAL"/> + <RegexFilter regex="Group.*is deprecated. Use.*instead" onMatch="DENY" onMismatch="NEUTRAL"/> + <RegexFilter regex="Failed to get files with ID; using regular API.*" onMatch="DENY" onMismatch="NEUTRAL"/> <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" /> </Filters> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/> @@ -97,8 +101,11 @@ <logger name="org.apache.hadoop.mapreduce.Job" level="INFO" additivity="true"> <appender-ref ref="YarnAppIdOutputFile"/> </logger> - <logger name="org.apache.tez.client.TezClient" level="INFO" additivity="true"> - <appender-ref ref="YarnAppIdOutputFile"/> + <logger name="org.apache.hadoop.hive.conf.HiveConf" level="ERROR" additivity="true"> + <appender-ref ref="Send"/> + </logger> + <logger name="org.apache.hadoop.mapreduce.split.JobSplitWriter" level="ERROR" additivity="true"> + <appender-ref ref="Send"/> </logger> </loggers> </configuration> diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index bf4eff0cb..abe32d76d 100644 --- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} import org.apache.hadoop.hive.ql.{Driver, QueryPlan} +import org.apache.hadoop.hive.ql.exec.Task.TaskState import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper import org.apache.hadoop.hive.ql.exec.tez.TezJobExecHelper @@ -204,9 +205,11 @@ class HiveEngineConnExecutor( var compileRet = -1 Utils.tryCatch { compileRet = driver.compile(realCode) - logger.info(s"driver compile realCode : ${realCode} finished, status : ${compileRet}") + logger.info( + s"driver compile realCode : \n ${realCode} \n finished, status : ${compileRet}" + ) if (0 != compileRet) { - logger.warn(s"compile realCode : ${realCode} error status : ${compileRet}") + logger.warn(s"compile realCode : \n ${realCode} \n error status : ${compileRet}") throw HiveQueryFailedException( COMPILE_HIVE_QUERY_ERROR.getErrorCode, COMPILE_HIVE_QUERY_ERROR.getErrorDesc @@ -469,17 +472,35 @@ class HiveEngineConnExecutor( val totalSQLs = engineExecutorContext.getTotalParagraph val currentSQL = engineExecutorContext.getCurrentParagraph val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float] - HadoopJobExecHelper.runningJobs synchronized { - HadoopJobExecHelper.runningJobs.asScala foreach { runningJob => - val name = runningJob.getID.toString - val _progress = runningJob.reduceProgress() + runningJob.mapProgress() - singleSqlProgressMap.put(name, _progress / 2) + val finishedStage = + if (null != driver && null != driver.getPlan() && !driver.getPlan().getRootTasks.isEmpty) { + Utils.tryQuietly( + Utilities + .getMRTasks(driver.getPlan().getRootTasks) + .asScala + .count(task => task.isMapRedTask && task.getTaskState == TaskState.FINISHED) + ) + } else { + 0 } - } var totalProgress: Float = 0.0f + if (!HadoopJobExecHelper.runningJobs.isEmpty) { + val runningJob = HadoopJobExecHelper.runningJobs.get(0) + val _progress = Utils.tryCatch(runningJob.reduceProgress() + runningJob.mapProgress()) { + case e: Exception => + logger.info(s"Failed to get job(${runningJob.getJobName}) progress ", e) + 0.2f + } + if (!_progress.isNaN) { + totalProgress = _progress / 2 + } + } + logger.info( + s"Running stage progress is $totalProgress, and finished stage is $finishedStage" + ) val hiveRunJobs = if (numberOfMRJobs <= 0) 1 else numberOfMRJobs - singleSqlProgressMap.asScala foreach { case (_name, _progress) => - totalProgress += _progress + if (finishedStage <= hiveRunJobs) { + totalProgress = totalProgress + finishedStage } try { totalProgress = totalProgress / (hiveRunJobs * totalSQLs) @@ -488,10 +509,10 @@ class HiveEngineConnExecutor( case _ => totalProgress = 0.0f } - logger.debug(s"hive progress is $totalProgress") val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin else totalProgress + currentBegin + logger.info(s"Hive progress is $newProgress, and finished stage is $finishedStage") val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext) if (newProgress < oldProgress) oldProgress else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
