This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 2f9f9bbf0 flink load default configuration (#5025)
2f9f9bbf0 is described below
commit 2f9f9bbf0ab01f89a793d79f0672024b401a229d
Author: yangwenzea <[email protected]>
AuthorDate: Fri Dec 8 23:25:58 2023 +0800
flink load default configuration (#5025)
* flink load default configuration
* fix gc log bug
* code format
---
.../flink/config/FlinkEnvConfiguration.scala | 7 +++
.../flink/factory/FlinkEngineConnFactory.scala | 54 ++++++++++++++++++--
.../flink/util/FlinkValueFormatUtil.scala | 57 ++++++++++++++++++++++
3 files changed, 115 insertions(+), 3 deletions(-)
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index bcd721c16..a6bbceb58 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -174,4 +174,11 @@ object FlinkEnvConfiguration {
val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)
+ val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")
+
+ val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)
+
+ val FLINK_ENV_JAVA_OPTS =
+ CommonVars("flink.env.java.opts", "env.java.opts")
+
}
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index 1b9759d84..c3da25380 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -35,7 +35,7 @@ import
org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import
org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext,
FlinkEngineConnContext}
import org.apache.linkis.engineconnplugin.flink.setting.Settings
-import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
+import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil,
FlinkValueFormatUtil, ManagerUtil}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.CheckpointingMode
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.yarn.configuration.{YarnConfigOptions,
YarnDeploymentTarget}
-import java.io.File
+import java.io.{File, FileNotFoundException}
import java.net.URL
import java.text.MessageFormat
import java.time.Duration
@@ -63,8 +63,10 @@ import java.util
import java.util.{Collections, Locale}
import scala.collection.JavaConverters._
+import scala.io.Source
import com.google.common.collect.{Lists, Sets}
+import org.yaml.snakeyaml.Yaml
class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with
Logging {
@@ -196,7 +198,15 @@ class FlinkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
numberOfTaskSlots)
// set extra configs
options.asScala.filter { case (key, _) =>
key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
- case (key, value) =>
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
+ case (key, value) =>
+ var flinkConfigValue = value
+ if (
+ FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
+ .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
+ ) {
+ flinkConfigValue = getExtractJavaOpts(value)
+ }
+ flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length),
flinkConfigValue)
}
// set kerberos config
if (FLINK_KERBEROS_ENABLE.getValue(options)) {
@@ -295,6 +305,44 @@ class FlinkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
context
}
+ private def getExtractJavaOpts(envJavaOpts: String): String = {
+ var defaultJavaOpts = ""
+ val yamlFilePath = FLINK_CONF_DIR.getValue
+ val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
+ if (new File(yamlFile).exists()) {
+ val source = Source.fromFile(yamlFile)
+ try {
+ val yamlContent = source.mkString
+ val yaml = new Yaml()
+ val configMap = yaml.loadAs(yamlContent,
classOf[util.LinkedHashMap[String, Object]])
+ if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
+ defaultJavaOpts =
configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
+ }
+ } finally {
+ source.close()
+ }
+ } else {
+ val inputStream = getClass.getResourceAsStream(yamlFile)
+ if (inputStream != null) {
+ val source = Source.fromInputStream(inputStream)
+ try {
+ val yamlContent = source.mkString
+ val yaml = new Yaml()
+ val configMap = yaml.loadAs(yamlContent,
classOf[util.LinkedHashMap[String, Object]])
+ if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
+ defaultJavaOpts =
configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
+ }
+ } finally {
+ source.close()
+ }
+ } else {
+ throw new FileNotFoundException("YAML file not found in both file
system and classpath.")
+ }
+ }
+ val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts,
envJavaOpts)
+ merged
+ }
+
protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = {
val engineConnModeLabel = getEngineConnModeLabel(labels)
engineConnModeLabel != null && (EngineConnMode.toEngineConnMode(
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
index 62782507e..0160e97ea 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
@@ -36,4 +36,61 @@ object FlinkValueFormatUtil {
case _ => null
}
+ def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String):
String = {
+ val patternX = """-XX:([^\s]+)=([^\s]+)""".r
+ val keyValueMapX = patternX
+ .findAllMatchIn(envJavaOpts)
+ .map { matchResult =>
+ val key = matchResult.group(1)
+ val value = matchResult.group(2)
+ (key, value)
+ }
+ .toMap
+
+ val patternD = """-D([^\s]+)=([^\s]+)""".r
+ val keyValueMapD = patternD
+ .findAllMatchIn(envJavaOpts)
+ .map { matchResult =>
+ val key = matchResult.group(1)
+ val value = matchResult.group(2)
+ (key, value)
+ }
+ .toMap
+ val xloggcPattern = """-Xloggc:[^\s]+""".r
+ val xloggcValueStr1 =
xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
+ val xloggcValueStr2 =
xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
+ var escapedXloggcValue = ""
+ var replaceStr1 = ""
+ var replaceStr2 = ""
+ if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
+ escapedXloggcValue = xloggcValueStr2.replace("\\<", "<").replace("\\>",
">")
+ replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1,
escapedXloggcValue)
+ replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
+ }
+ if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
+ escapedXloggcValue = xloggcValueStr1.replace("\\<", "<").replace("\\>",
">")
+ replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1,
escapedXloggcValue)
+ replaceStr2 = envJavaOpts
+ }
+ if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
+ replaceStr1 = defaultJavaOpts
+ replaceStr2 = envJavaOpts
+ }
+ val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
+ val (key, value) = entry
+ val oldValue = s"$key=[^\\s]+"
+ val newValue = key + "=" + value
+ result.replaceAll(oldValue, newValue)
+ }
+
+ val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry)
=>
+ val (key, value) = entry
+ val oldValue = s"$key=[^\\s]+"
+ val newValue = key + "=" + value
+ result.replaceAll(oldValue, newValue)
+ }
+ val javaOpts = (MergedStringD.split("\\s+") ++
replaceStr2.split("\\s+")).distinct.mkString(" ")
+ javaOpts
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]