This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new deb76087b refactor: refact some code and clear some Scala code format
alarm (#3698)
deb76087b is described below
commit deb76087bc65a73f293f4a9db073b44486feea09
Author: Jack Xu <[email protected]>
AuthorDate: Wed Nov 2 14:43:01 2022 +0800
refactor: refact some code and clear some Scala code format alarm (#3698)
---
.../org/apache/linkis/common/ServiceInstance.scala | 5 +-
.../apache/linkis/common/utils/ClassUtils.scala | 10 +++-
.../linkis/common/utils/ClassUtilsTest.scala | 1 -
.../core/launch/ProcessEngineCommandBuilder.scala | 3 +-
.../conf/EngineConnPluginConfiguration.scala | 7 +--
.../manager/label/entity/SerializableLabel.java | 11 ++--
.../label/utils/EngineTypeLabelCreator.java | 2 +-
.../linkis/manager/label/utils/LabelUtils.java | 34 ++++++------
.../common/entity/enumeration/NodeHealthy.java | 2 +-
.../common/entity/enumeration/NodeStatus.java | 28 ++--------
.../manager/common/entity/resource/Resource.scala | 61 ++++++++++------------
linkis-dist/pom.xml | 6 +--
.../flink/config/FlinkResourceConfiguration.scala | 15 +++---
.../flink/executor/FlinkExecutor.scala | 9 ++--
.../spark/Interpreter/PythonInterpreter.scala | 7 ++-
.../spark/factory/SparkEngineConnFactory.scala | 19 +++----
.../engineplugin/spark/utils/EngineUtils.scala | 10 ++++
.../server/src/main/assembly/distribution.xml | 11 ----
18 files changed, 107 insertions(+), 134 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
index 5323602b7..8fcb4af73 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
@@ -25,12 +25,9 @@ class ServiceInstance {
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance
- def canEqual(other: Any): Boolean = other.isInstanceOf[ServiceInstance]
-
override def equals(other: Any): Boolean = other match {
case that: ServiceInstance =>
- (that canEqual this) &&
- applicationName == that.applicationName &&
+ applicationName == that.applicationName &&
instance == that.instance
case _ => false
}
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/ClassUtils.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/ClassUtils.scala
index b29b5354b..f357617b7 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/ClassUtils.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/ClassUtils.scala
@@ -28,8 +28,16 @@ object ClassUtils {
lazy val reflections =
new Reflections(Configuration.REFLECT_SCAN_PACKAGE,
this.getClass.getClassLoader)
+ /**
+ * Get the path of the class
+ * @param cls
+ * class type
+ * @return
+ * the resource path of cls
+ */
def jarOfClass(cls: Class[_]): Option[String] = {
- val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
+ val uri =
+ cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
if (uri != null) {
val uriStr = uri.toString
if (uriStr.startsWith("jar:file:")) {
diff --git
a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/ClassUtilsTest.scala
b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/ClassUtilsTest.scala
index 590ec8c1a..1ef82f32b 100644
---
a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/ClassUtilsTest.scala
+++
b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/ClassUtilsTest.scala
@@ -33,7 +33,6 @@ class ClassUtilsTest {
"/" +
classOf[StringUtils].getName.replace('.', '/') + ".class"
)
- println(s"StringutilsUri is $uri")
assertEquals(
Some(uri.toString.substring("jar:file:".length,
uri.toString.indexOf("!"))),
someClass
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala
index eeb976bfa..fade444fa 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala
@@ -23,6 +23,7 @@ import
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta
import org.apache.commons.io.IOUtils
import java.io.OutputStream
+import java.nio.charset.StandardCharsets
trait ProcessEngineCommandBuilder {
@@ -55,7 +56,7 @@ abstract class ShellProcessEngineCommandBuilder extends
ProcessEngineCommandBuil
}
def writeTo(output: OutputStream): Unit = {
- IOUtils.write(sb, output)
+ IOUtils.write(sb, output, StandardCharsets.UTF_8)
}
override def replaceExpansionMarker(value: String): String = value
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-server/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-server/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala
index 81d0eb56b..de1add173 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-server/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-server/src/main/scala/org/apache/linkis/engineplugin/server/conf/EngineConnPluginConfiguration.scala
@@ -21,7 +21,7 @@ import org.apache.linkis.common.conf.{CommonVars,
Configuration}
object EngineConnPluginConfiguration {
- val ENGINE_CONN_HOME = CommonVars(
+ val ENGINE_CONN_HOME: CommonVars[String] = CommonVars(
"wds.linkis.engineconn.home",
CommonVars[String](
"ENGINE_CONN_HOME",
@@ -29,9 +29,10 @@ object EngineConnPluginConfiguration {
).getValue
)
- val ENGINE_CONN_DIST_LOAD_ENABLE =
CommonVars("wds.linkis.engineconn.dist.load.enable", true)
+ val ENGINE_CONN_DIST_LOAD_ENABLE: CommonVars[Boolean] =
+ CommonVars("wds.linkis.engineconn.dist.load.enable", true)
- val ENABLED_BML_UPLOAD_FAILED_EXIT =
+ val ENABLED_BML_UPLOAD_FAILED_EXIT: CommonVars[Boolean] =
CommonVars("wds.linkis.engineconn.bml.upload.failed.enable", true)
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/SerializableLabel.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/SerializableLabel.java
index 8b94e2e23..30d4576c6 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/SerializableLabel.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/SerializableLabel.java
@@ -22,18 +22,19 @@ import org.apache.linkis.manager.label.utils.LabelUtils;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.WeakHashMap;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Serialize label value to string value and override the equals method */
public abstract class SerializableLabel<T> implements Label<T> {
- private static Logger LOG = LoggerFactory.getLogger(GenericLabel.class);
-
/** Cache the value names, weak hashMap */
private static final Map<Class<?>, List<String>> CACHE =
Collections.synchronizedMap(new WeakHashMap<>());
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
index 57fe62094..38eaa899f 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
@@ -98,7 +98,7 @@ public class EngineTypeLabelCreator {
EngineTypeLabel label =
labelBuilderFactory.createLabel(EngineTypeLabel.class);
label.setEngineType(type);
String version = defaultVersion.get(type);
- if (!StringUtils.isEmpty(version)) {
+ if (StringUtils.isNotBlank(version)) {
label.setVersion(version);
} else {
label.setVersion("*");
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/LabelUtils.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/LabelUtils.java
index cd71e8836..7ff53c3c8 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/LabelUtils.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/LabelUtils.java
@@ -28,12 +28,22 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.*;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,24 +64,12 @@ public class LabelUtils {
* @return
*/
public static boolean isBasicType(Class<?> clz) {
- return clz.equals(String.class)
- || clz.equals(Enum.class)
- || clz.isPrimitive()
- || isWrapClass(clz);
- }
-
- /**
- * If is wrap class
- *
- * @param clz class
- * @return
- */
- private static boolean isWrapClass(Class<?> clz) {
- try {
- return ((Class<?>) clz.getField("TYPE").get(null)).isPrimitive();
- } catch (Exception e) {
+ if (clz == null) {
return false;
}
+ return clz.equals(String.class)
+ || clz.equals(Enum.class)
+ || org.apache.commons.lang3.ClassUtils.isPrimitiveOrWrapper(clz);
}
/**
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeHealthy.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeHealthy.java
index 66293a4ca..9f40af3bb 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeHealthy.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeHealthy.java
@@ -20,7 +20,7 @@ package org.apache.linkis.manager.common.entity.enumeration;
public enum NodeHealthy {
/**
- * 节点监控状态信息 to monitor node status info Healthy:状态正常 UnHealthy:
EM自己标识自己为UnHealthy 或者
+ * 节点监控状态信息 to monitor node status info. Healthy:状态正常 UnHealthy:
EM自己标识自己为UnHealthy 或者
* manager把他标识为UnHealthy 处理引擎状态不正常,manager主动要求所有的engine强制退出(engine自杀) WARN:
引擎处于告警状态,但是可以接受任务
* StockAvailable: 存量可用状态,可以接受任务。当EM状态最近n次心跳没有上报,但是已经启动的Engine还是正常的可以接受任务
StockUnavailable:
* 存量不可用状态,不可以接受任务。(超过n+1没上报心跳)或者(EM自己判断,但是服务正常的情况),但是如果往上面提交任务会出现error失败情况
EM
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeStatus.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeStatus.java
index 691b96ebd..022ebeeed 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeStatus.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/enumeration/NodeStatus.java
@@ -17,6 +17,8 @@
package org.apache.linkis.manager.common.entity.enumeration;
+import org.apache.commons.lang3.StringUtils;
+
public enum NodeStatus {
/**
@@ -67,33 +69,11 @@ public enum NodeStatus {
}
public static NodeStatus toNodeStatus(String status) throws
IllegalArgumentException {
- if (null == status || "".equals(status)) {
+ if (StringUtils.isBlank(status)) {
throw new IllegalArgumentException(
"Invalid status : " + status + " cannot be matched in NodeStatus");
}
- switch (status) {
- case "Starting":
- return NodeStatus.Starting;
- case "ShuttingDown":
- return NodeStatus.ShuttingDown;
- case "Failed":
- return NodeStatus.Failed;
- case "Success":
- return NodeStatus.Success;
- case "Idle":
- return NodeStatus.Idle;
- case "Busy":
- return NodeStatus.Busy;
- case "Locked":
- return NodeStatus.Locked;
- case "Unlock":
- return NodeStatus.Unlock;
- case "Running":
- return NodeStatus.Running;
- default:
- throw new IllegalArgumentException(
- "Invalid status : " + status + " in all values in NodeStatus");
- }
+ return NodeStatus.valueOf(status);
}
public static NodeHealthy isEngineNodeHealthy(NodeStatus status) {
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala
index 53e066a5e..9d550cb41 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/scala/org/apache/linkis/manager/common/entity/resource/Resource.scala
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringUtils
import java.text.MessageFormat
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
import org.json4s.{CustomSerializer, DefaultFormats, Extraction}
import org.json4s.JsonAST.JObject
@@ -60,25 +60,25 @@ abstract class Resource {
def notLess(r: Resource): Boolean
- def +(r: Resource) = add(r)
+ def +(r: Resource): Resource = add(r)
- def -(r: Resource) = minus(r)
+ def -(r: Resource): Resource = minus(r)
- def *(rate: Float) = multiplied(rate)
+ def *(rate: Float): Resource = multiplied(rate)
- def *(rate: Double) = multiplied(rate.toFloat)
+ def *(rate: Double): Resource = multiplied(rate.toFloat)
- def /(rate: Int) = divide(rate)
+ def /(rate: Int): Resource = divide(rate)
- def ==(r: Resource) = equalsTo(r)
+ def ==(r: Resource): Boolean = equalsTo(r)
- def >(r: Resource) = moreThan(r)
+ def >(r: Resource): Boolean = moreThan(r)
- def >=(r: Resource) = notLess(r)
+ def >=(r: Resource): Boolean = notLess(r)
- def <(r: Resource) = ! >=(r)
+ def <(r: Resource): Boolean = ! >=(r)
- def <=(r: Resource) = ! >(r)
+ def <=(r: Resource): Boolean = ! >(r)
def toJson: String
}
@@ -445,15 +445,17 @@ class DriverAndYarnResource(
) {
logger.debug(s"Not module operate this:$this other:$r")
false
- } else
+ } else {
false
+ }
}
def isModuleOperate: Boolean = {
- if (this.yarnResource != null &&
StringUtils.isNotEmpty(this.yarnResource.queueName))
+ if (this.yarnResource != null &&
StringUtils.isNotEmpty(this.yarnResource.queueName)) {
false
- else
+ } else {
true
+ }
}
override def add(r: Resource): DriverAndYarnResource = {
@@ -565,7 +567,7 @@ class DriverAndYarnResource(
}
class SpecialResource(val resources: java.util.Map[String, AnyVal]) extends
Resource {
- def this(resources: Map[String, AnyVal]) =
this(JavaConversions.mapAsJavaMap(resources))
+ def this(resources: Map[String, AnyVal]) = this(resources.asJava)
private def specialResourceOperator(
r: Resource,
@@ -573,15 +575,10 @@ class SpecialResource(val resources:
java.util.Map[String, AnyVal]) extends Reso
): SpecialResource = r match {
case s: SpecialResource =>
val rs = s.resources
- new SpecialResource(
- JavaConversions
- .mapAsScalaMap(resources)
- .map { case (k, v) =>
- val v1 = rs.get(k)
- k -> op(v, v1)
- }
- .toMap
- )
+ new SpecialResource(resources.asScala.map { case (k, v) =>
+ val v1 = rs.get(k)
+ k -> op(v, v1)
+ }.toMap)
case _ => new SpecialResource(Map.empty[String, AnyVal])
}
@@ -637,8 +634,7 @@ class SpecialResource(val resources: java.util.Map[String,
AnyVal]) extends Reso
)
override def multiplied(rate: Float): Resource = new SpecialResource(
- JavaConversions
- .mapAsScalaMap(resources)
+ resources.asScala
.map {
case (k, i: Int) => k -> (i * rate).toInt
case (k, d: Double) => k -> d * rate
@@ -672,8 +668,7 @@ class SpecialResource(val resources: java.util.Map[String,
AnyVal]) extends Reso
)
override def divide(rate: Int): Resource = new SpecialResource(
- JavaConversions
- .mapAsScalaMap(resources)
+ resources.asScala
.map {
case (k, i: Int) => k -> i / rate
case (k, d: Double) => k -> d / rate
@@ -692,7 +687,7 @@ class SpecialResource(val resources: java.util.Map[String,
AnyVal]) extends Reso
override def moreThan(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
- !JavaConversions.mapAsScalaMap(resources).exists {
+ !resources.asScala.exists {
case (k, i: Int) => i <= rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d <= rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l <= rs.get(k).asInstanceOf[Long]
@@ -716,7 +711,7 @@ class SpecialResource(val resources: java.util.Map[String,
AnyVal]) extends Reso
override def caseMore(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
- JavaConversions.mapAsScalaMap(resources).exists {
+ resources.asScala.exists {
case (k, i: Int) => i > rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d > rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l > rs.get(k).asInstanceOf[Long]
@@ -734,7 +729,7 @@ class SpecialResource(val resources: java.util.Map[String,
AnyVal]) extends Reso
override def equalsTo(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
- !JavaConversions.mapAsScalaMap(resources).exists {
+ !resources.asScala.exists {
case (k, i: Int) => i != rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d != rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l != rs.get(k).asInstanceOf[Long]
@@ -752,7 +747,7 @@ class SpecialResource(val resources: java.util.Map[String,
AnyVal]) extends Reso
override def notLess(r: Resource): Boolean = r match {
case s: SpecialResource =>
val rs = s.resources
- !JavaConversions.mapAsScalaMap(resources).exists {
+ !resources.asScala.exists {
case (k, i: Int) => i < rs.get(k).asInstanceOf[Int]
case (k, d: Double) => d < rs.get(k).asInstanceOf[Double]
case (k, l: Long) => l < rs.get(k).asInstanceOf[Long]
@@ -852,7 +847,7 @@ object ResourceSerializer
)
)
case s: SpecialResource =>
- ("resources",
Serialization.write(JavaConversions.mapAsScalaMap(s.resources).toMap))
+ ("resources", Serialization.write(s.resources.asScala.toMap))
case r: Resource =>
throw new ResourceWarnException(
NOT_RESOURCE_TYPE.getErrorCode,
diff --git a/linkis-dist/pom.xml b/linkis-dist/pom.xml
index 1e086050f..a9d628d80 100644
--- a/linkis-dist/pom.xml
+++ b/linkis-dist/pom.xml
@@ -235,7 +235,7 @@
<exec executable="docker" failonerror="true">
<arg value="build"/>
<arg value="-f"/>
- <arg
value="${project.basedir}/docker/linkis.Dockerfile"/>
+ <arg value="${basedir}/docker/linkis.Dockerfile"/>
<arg value="-t"/>
<arg
value="${project.parent.artifactId}:${project.version}"/>
<arg value="--target"/>
@@ -288,7 +288,7 @@
<exec executable="docker" failonerror="true">
<arg value="build"/>
<arg value="-f"/>
- <arg
value="${project.basedir}/docker/linkis.Dockerfile"/>
+ <arg value="${basedir}/docker/linkis.Dockerfile"/>
<arg value="-t"/>
<arg
value="${project.parent.artifactId}-web:${project.version}"/>
<arg value="--target"/>
@@ -330,7 +330,7 @@
<exec executable="docker" failonerror="true">
<arg value="build"/>
<arg value="-f"/>
- <arg value="${project.basedir}/docker/ldh.Dockerfile"/>
+ <arg value="${basedir}/docker/ldh.Dockerfile"/>
<arg value="-t"/>
<arg
value="${project.parent.artifactId}-ldh:${project.version}"/>
<arg value="--target"/>
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
index 8c9a8effa..94db3f906 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
@@ -21,16 +21,17 @@ import org.apache.linkis.common.conf.CommonVars
object FlinkResourceConfiguration {
- val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory",
1024) // Unit: M(单位为M)
+ // Unit: M(单位为M)
+ val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 1024)
- val LINKIS_FLINK_CLIENT_CORES =
- 1 // Fixed to 1(固定为1) CommonVars[Int]("wds.linkis.driver.cores", 1)
+ // Fixed to 1(固定为1) CommonVars[Int]("wds.linkis.driver.cores", 1)
+ val LINKIS_FLINK_CLIENT_CORES = 1
- val LINKIS_FLINK_JOB_MANAGER_MEMORY =
- CommonVars[Int]("flink.jobmanager.memory", 1024) // Unit: M(单位为M)
+ // Unit: M(单位为M)
+ val LINKIS_FLINK_JOB_MANAGER_MEMORY =
CommonVars[Int]("flink.jobmanager.memory", 1024)
- val LINKIS_FLINK_TASK_MANAGER_MEMORY =
- CommonVars[Int]("flink.taskmanager.memory", 4096) // Unit: M(单位为M)
+ // Unit: M(单位为M)
+ val LINKIS_FLINK_TASK_MANAGER_MEMORY =
CommonVars[Int]("flink.taskmanager.memory", 4096)
val LINKIS_FLINK_TASK_SLOTS =
CommonVars[Int]("flink.taskmanager.numberOfTaskSlots", 2)
val LINKIS_FLINK_TASK_MANAGER_CPU_CORES =
CommonVars[Int]("flink.taskmanager.cpu.cores", 2)
val LINKIS_FLINK_CONTAINERS = CommonVars[Int]("flink.container.num", 2)
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
index fcf973295..fd2827da6 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.flink.executor
import org.apache.linkis.common.io.{MetaData, Record}
import org.apache.linkis.common.io.resultset.ResultSetWriter
-import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.common.utils.OverloadUtils
import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import org.apache.linkis.engineconn.executor.entity.{LabelExecutor,
ResourceExecutor, YarnExecutor}
import
org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultSet
@@ -36,7 +36,6 @@ import org.apache.linkis.storage.domain.{Column, DataType}
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-import org.apache.commons.io.IOUtils
import org.apache.flink.configuration.{CoreOptions, JobManagerOptions,
TaskManagerOptions}
import org.apache.flink.types.Row
import org.apache.flink.yarn.configuration.YarnConfigOptions
@@ -109,19 +108,19 @@ trait FlinkExecutor extends YarnExecutor with
LabelExecutor with ResourceExecuto
object FlinkExecutor {
- import scala.collection.JavaConversions._
+ import scala.collection.JavaConverters._
def writeResultSet(
resultSet: ResultSet,
resultSetWriter: ResultSetWriter[_ <: MetaData, _ <: Record]
): Unit = {
- val columns = resultSet.getColumns
+ val columns = resultSet.getColumns.asScala
.map(columnInfo => Column(columnInfo.getName,
DataType.toDataType(columnInfo.getType), null))
.toArray
resultSetWriter.addMetaData(new TableMetaData(columns))
resultSet.getData match {
case data: util.List[Row] =>
- data.foreach { row =>
+ data.asScala.foreach { row =>
val record =
(0 until
row.getArity).map(row.getField).map(FlinkValueFormatUtil.formatValue).toArray
resultSetWriter.addRecord(new TableRecord(record))
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
index f6f5a7d67..4223db8ba 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
@@ -19,10 +19,9 @@ package org.apache.linkis.engineplugin.spark.Interpreter
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.io.FsPath
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.{ClassUtils, Logging, Utils}
import org.apache.linkis.engineplugin.spark.common.LineBufferedStream
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
-import org.apache.linkis.engineplugin.spark.utils.EngineUtils
import org.apache.linkis.storage.FSFactory
import org.apache.commons.io.IOUtils
@@ -76,7 +75,7 @@ object PythonInterpreter {
override def accept(pathname: File): Boolean =
pathname.getName.endsWith(".zip")
})
.foreach(f => pythonPath += f.getPath)
- EngineUtils.jarOfClass(classOf[SparkContext]).foreach(pythonPath += _)
+ ClassUtils.jarOfClass(classOf[SparkContext]).foreach(pythonPath += _)
pythonPath.mkString(File.pathSeparator)
}
@@ -200,7 +199,7 @@ private class PythonInterpreter(process: Process,
gatewayServer: GatewayServer)
override def accept(pathname: File): Boolean =
pathname.getName.endsWith(".zip")
})
.foreach(f => pythonPath += f.getPath)
- EngineUtils.jarOfClass(classOf[SparkContext]).foreach(pythonPath += _)
+ ClassUtils.jarOfClass(classOf[SparkContext]).foreach(pythonPath += _)
pythonPath.mkString(File.pathSeparator)
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index 32ba4d0cf..cb768301d 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -59,22 +59,17 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
logger.info(s"------ Create new SparkContext {$master} -------")
val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue
val pysparkPath = new File(pysparkBasePath, "python" + File.separator +
"lib")
- val pythonLibUris =
pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip"))
+ var pythonLibUris =
pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip"))
if (pythonLibUris.length == 2) {
val sparkConfValue1 =
Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
val sparkConfValue2 =
Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
- if (StringUtils.isEmpty(sparkConfValue1) &&
StringUtils.isEmpty(sparkConfValue2)) {
- sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
- } else if (StringUtils.isEmpty(sparkConfValue1)) {
- sparkConf.set("spark.yarn.dist.files", sparkConfValue2 + "," +
pythonLibUris.mkString(","))
- } else if (StringUtils.isEmpty(sparkConfValue2)) {
- sparkConf.set("spark.yarn.dist.files", sparkConfValue1 + "," +
pythonLibUris.mkString(","))
- } else {
- sparkConf.set(
- "spark.yarn.dist.files",
- sparkConfValue1 + "," + sparkConfValue2 + "," +
pythonLibUris.mkString(",")
- )
+ if (StringUtils.isNotBlank(sparkConfValue2)) {
+ pythonLibUris = sparkConfValue2 +: pythonLibUris
}
+ if (StringUtils.isNotBlank(sparkConfValue1)) {
+ pythonLibUris = sparkConfValue1 +: pythonLibUris
+ }
+ sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
}
// Distributes needed libraries to workers
// when spark version is greater than or equal to 1.5.0
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
index 8c197ae69..f4508104e 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
@@ -78,6 +78,16 @@ object EngineUtils extends Logging {
sparkVersion
}
+ /**
+ * Get the path of the class
+ * @param cls
+ * class type
+ * @return
+ * the resource path of cls
+ * @deprecated
+ * use org.apache.linkis.common.utils.ClassUtils::jarOfClass
+ */
+ @deprecated
def jarOfClass(cls: Class[_]): Option[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
if (uri != null) {
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/assembly/distribution.xml
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/assembly/distribution.xml
index 21f2c3ada..2cb928fb9 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/assembly/distribution.xml
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/assembly/distribution.xml
@@ -84,17 +84,6 @@
<exclude>*-javadoc.jar</exclude>
</excludes>
</fileSet>
- <fileSet>
- <directory>${basedir}/../service/tdsql/target/out/lib</directory>
- <fileMode>0755</fileMode>
- <outputDirectory>lib/service/tdsql</outputDirectory>
- <includes>
- <include>*.jar</include>
- </includes>
- <excludes>
- <exclude>*-javadoc.jar</exclude>
- </excludes>
- </fileSet>
<fileSet>
<directory>${basedir}/../service/hdfs/target/out/lib</directory>
<fileMode>0755</fileMode>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]