This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 50cd8a952b4a26c4e8ce28204f1443e968d01a48
Author: peacewong <[email protected]>
AuthorDate: Thu May 5 18:02:58 2022 +0800

    Optimize the progress get logic of Hive and Spark
---
 .../computation/executor/utlis/ProgressUtils.scala | 41 ++++++++++++++++++++++
 .../hive/executor/HiveEngineConnExecutor.scala     |  9 +++--
 .../spark/executor/SparkEngineConnExecutor.scala   | 27 ++++----------
 3 files changed, 55 insertions(+), 22 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
new file mode 100644
index 000000000..695b783f3
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ProgressUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.executor.utlis
+
+import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+
+object ProgressUtils {
+
+  private val OLD_PROGRESS_KEY = "oldProgress"
+
+   def getOldProgress(engineExecutionContext: EngineExecutionContext): Float = 
{
+    if (null == engineExecutionContext) {
+      0f
+    } else {
+      val value = engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY)
+      if (null == value) 0f else value.asInstanceOf[Float]
+    }
+  }
+
+   def putProgress(newProgress: Float, engineExecutionContext: 
EngineExecutionContext): Unit = {
+    if (null != engineExecutionContext) {
+      engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, 
newProgress.asInstanceOf[AnyRef])
+    }
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 68875f9ca..d91225d95 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -55,6 +55,7 @@ import 
org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.hive.conf.Counters
 import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
 import org.apache.commons.lang.StringUtils
+import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
 import org.apache.linkis.governance.common.utils.JobUtils
 
 import scala.collection.JavaConversions._
@@ -405,8 +406,12 @@ class HiveEngineConnExecutor(id: Int,
       }
 
       logger.debug(s"hive progress is $totalProgress")
-      if (totalProgress.isNaN || totalProgress.isInfinite) return currentBegin
-      totalProgress + currentBegin
+      val newProgress = if (totalProgress.isNaN || totalProgress.isInfinite)  
currentBegin else totalProgress + currentBegin
+      val oldProgress = 
ProgressUtils.getOldProgress(this.engineExecutorContext)
+      if(newProgress < oldProgress) oldProgress else {
+        ProgressUtils.putProgress(newProgress, this.engineExecutorContext)
+        newProgress
+      }
     } else 0.0f
   }
 
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 88f1f2b01..6da22b3a6 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -18,12 +18,10 @@
 package org.apache.linkis.engineplugin.spark.executor
 
 import org.apache.commons.lang3.StringUtils
-
-import java.util
-import java.util.concurrent.atomic.AtomicLong
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
 import 
org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, 
EngineExecutionContext}
+import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.spark.common.Kind
 import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
@@ -40,6 +38,8 @@ import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.scheduler.executer.ExecuteResponse
 import org.apache.spark.SparkContext
 
+import java.util
+import java.util.concurrent.atomic.AtomicLong
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -48,7 +48,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, 
id: Long) extends C
 
   private var initialized: Boolean = false
 
-  private val OLD_PROGRESS_KEY = "oldProgress"
+
 
   private var jobGroup: String = _
 
@@ -112,28 +112,15 @@ abstract class SparkEngineConnExecutor(val sc: 
SparkContext, id: Long) extends C
   }
 
 
-  private def getOldProgress(): Float = {
-    if (null == this.engineExecutionContext) {
-      0f
-    } else {
-      val value = 
this.engineExecutionContext.getProperties.get(OLD_PROGRESS_KEY)
-      if (null == value) 0f else value.asInstanceOf[Float]
-    }
-  }
 
-  private def putProgress(newProgress: Float): Unit = {
-    if (null != this.engineExecutionContext) {
-      this.engineExecutionContext.getProperties.put(OLD_PROGRESS_KEY, 
newProgress.asInstanceOf[AnyRef])
-    }
-  }
 
-  override def progress(taskID: String): Float = if (jobGroup == null || 
engineExecutionContext.getTotalParagraph == 0) getOldProgress()
+  override def progress(taskID: String): Float = if (jobGroup == null || 
engineExecutionContext.getTotalParagraph == 0) 
ProgressUtils.getOldProgress(this.engineExecutionContext)
   else {
     val newProgress = (engineExecutionContext.getCurrentParagraph * 1f - 1f)/ 
engineExecutionContext.getTotalParagraph + JobProgressUtil.progress(sc, 
jobGroup)/engineExecutionContext.getTotalParagraph
     val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else 
newProgress
-    val oldProgress = getOldProgress()
+    val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
     if(normalizedProgress < oldProgress) oldProgress else {
-      putProgress(normalizedProgress)
+      ProgressUtils.putProgress(normalizedProgress, 
this.engineExecutionContext)
       normalizedProgress
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to