This is an automated email from the ASF dual-hosted git repository. wangzhen pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 50cd8a952b4a26c4e8ce28204f1443e968d01a48 Author: peacewong <[email protected]> AuthorDate: Thu May 5 18:02:58 2022 +0800 Optimize the progress get logic of Hive and Spark --- .../computation/executor/utlis/ProgressUtils.scala | 41 ++++++++++++++++++++++ .../hive/executor/HiveEngineConnExecutor.scala | 9 +++-- .../spark/executor/SparkEngineConnExecutor.scala | 27 ++++---------- 3 files changed, 55 insertions(+), 22 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala new file mode 100644 index 000000000..695b783f3 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconn.computation.executor.utlis + +import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext + +object ProgressUtils { + + private val OLD_PROGRESS_KEY = "oldProgress" + + def getOldProgress(engineExecutionContext: EngineExecutionContext): Float = { + if (null == engineExecutionContext) { + 0f + } else { + val value = engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY) + if (null == value) 0f else value.asInstanceOf[Float] + } + } + + def putProgress(newProgress: Float, engineExecutionContext: EngineExecutionContext): Unit = { + if (null != engineExecutionContext) { + engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, newProgress.asInstanceOf[AnyRef]) + } + } + +} diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala index 68875f9ca..d91225d95 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala @@ -55,6 +55,7 @@ import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor import org.apache.linkis.engineplugin.hive.conf.Counters import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus import org.apache.commons.lang.StringUtils +import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils import org.apache.linkis.governance.common.utils.JobUtils import scala.collection.JavaConversions._ @@ -405,8 +406,12 @@ class HiveEngineConnExecutor(id: Int, } logger.debug(s"hive progress is $totalProgress") - if (totalProgress.isNaN || totalProgress.isInfinite) return currentBegin - totalProgress + currentBegin + val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin else totalProgress + currentBegin + val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext) + if(newProgress < oldProgress) oldProgress else { + ProgressUtils.putProgress(newProgress, this.engineExecutorContext) + newProgress + } } else 0.0f } diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index 88f1f2b01..6da22b3a6 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -18,12 +18,10 @@ package org.apache.linkis.engineplugin.spark.executor import org.apache.commons.lang3.StringUtils - -import java.util -import java.util.concurrent.atomic.AtomicLong import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils} import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext} +import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor import org.apache.linkis.engineplugin.spark.common.Kind import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper @@ -40,6 +38,8 @@ import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.scheduler.executer.ExecuteResponse import org.apache.spark.SparkContext +import java.util +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -48,7 +48,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C private var initialized: Boolean = false - private val OLD_PROGRESS_KEY = "oldProgress" + private var jobGroup: String = _ @@ -112,28 +112,15 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C } - private def getOldProgress(): Float = { - if (null == this.engineExecutionContext) { - 0f - } else { - val value = this.engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY) - if (null == value) 0f else value.asInstanceOf[Float] - } - } - private def putProgress(newProgress: Float): Unit = { - if (null != this.engineExecutionContext) { - this.engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, newProgress.asInstanceOf[AnyRef]) - } - } - override def progress(taskID: String): Float = if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) getOldProgress() + override def progress(taskID: String): Float = if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) ProgressUtils.getOldProgress(this.engineExecutionContext) else { val newProgress = (engineExecutionContext.getCurrentParagraph * 1f - 1f)/ engineExecutionContext.getTotalParagraph + JobProgressUtil.progress(sc, jobGroup)/engineExecutionContext.getTotalParagraph val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress - val oldProgress = getOldProgress() + val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext) if(normalizedProgress < oldProgress) oldProgress else { - putProgress(normalizedProgress) + ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext) normalizedProgress } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
