This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f9f9bbf0 flink load default configuration (#5025)
2f9f9bbf0 is described below

commit 2f9f9bbf0ab01f89a793d79f0672024b401a229d
Author: yangwenzea <[email protected]>
AuthorDate: Fri Dec 8 23:25:58 2023 +0800

    flink load default configuration (#5025)
    
    * flink load default configuration
    
    * fix gc log bug
    
    * code format
---
 .../flink/config/FlinkEnvConfiguration.scala       |  7 +++
 .../flink/factory/FlinkEngineConnFactory.scala     | 54 ++++++++++++++++++--
 .../flink/util/FlinkValueFormatUtil.scala          | 57 ++++++++++++++++++++++
 3 files changed, 115 insertions(+), 3 deletions(-)

diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index bcd721c16..a6bbceb58 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -174,4 +174,11 @@ object FlinkEnvConfiguration {
   val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
     CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)
 
+  val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml")
+
+  val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true)
+
+  val FLINK_ENV_JAVA_OPTS =
+    CommonVars("flink.env.java.opts", "env.java.opts")
+
 }
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index 1b9759d84..c3da25380 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -35,7 +35,7 @@ import 
org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
 import 
org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
 import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, 
FlinkEngineConnContext}
 import org.apache.linkis.engineconnplugin.flink.setting.Settings
-import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
+import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, 
FlinkValueFormatUtil, ManagerUtil}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
 import org.apache.linkis.manager.engineplugin.common.creation.{
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.CheckpointingMode
 import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
 import org.apache.flink.yarn.configuration.{YarnConfigOptions, 
YarnDeploymentTarget}
 
-import java.io.File
+import java.io.{File, FileNotFoundException}
 import java.net.URL
 import java.text.MessageFormat
 import java.time.Duration
@@ -63,8 +63,10 @@ import java.util
 import java.util.{Collections, Locale}
 
 import scala.collection.JavaConverters._
+import scala.io.Source
 
 import com.google.common.collect.{Lists, Sets}
+import org.yaml.snakeyaml.Yaml
 
 class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with 
Logging {
 
@@ -196,7 +198,15 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numberOfTaskSlots)
     // set extra configs
     options.asScala.filter { case (key, _) => 
key.startsWith(FLINK_CONFIG_PREFIX) }.foreach {
-      case (key, value) => 
flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
+      case (key, value) =>
+        var flinkConfigValue = value
+        if (
+            FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key
+              .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue)
+        ) {
+          flinkConfigValue = getExtractJavaOpts(value)
+        }
+        flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), 
flinkConfigValue)
     }
     // set kerberos config
     if (FLINK_KERBEROS_ENABLE.getValue(options)) {
@@ -295,6 +305,44 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     context
   }
 
+  private def getExtractJavaOpts(envJavaOpts: String): String = {
+    var defaultJavaOpts = ""
+    val yamlFilePath = FLINK_CONF_DIR.getValue
+    val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue()
+    if (new File(yamlFile).exists()) {
+      val source = Source.fromFile(yamlFile)
+      try {
+        val yamlContent = source.mkString
+        val yaml = new Yaml()
+        val configMap = yaml.loadAs(yamlContent, 
classOf[util.LinkedHashMap[String, Object]])
+        if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
+          defaultJavaOpts = 
configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
+        }
+      } finally {
+        source.close()
+      }
+    } else {
+      val inputStream = getClass.getResourceAsStream(yamlFile)
+      if (inputStream != null) {
+        val source = Source.fromInputStream(inputStream)
+        try {
+          val yamlContent = source.mkString
+          val yaml = new Yaml()
+          val configMap = yaml.loadAs(yamlContent, 
classOf[util.LinkedHashMap[String, Object]])
+          if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) {
+            defaultJavaOpts = 
configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString
+          }
+        } finally {
+          source.close()
+        }
+      } else {
+        throw new FileNotFoundException("YAML file not found in both file 
system and classpath.")
+      }
+    }
+    val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, 
envJavaOpts)
+    merged
+  }
+
   protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = {
     val engineConnModeLabel = getEngineConnModeLabel(labels)
     engineConnModeLabel != null && (EngineConnMode.toEngineConnMode(
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
index 62782507e..0160e97ea 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala
@@ -36,4 +36,61 @@ object FlinkValueFormatUtil {
     case _ => null
   }
 
+  def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): 
String = {
+    val patternX = """-XX:([^\s]+)=([^\s]+)""".r
+    val keyValueMapX = patternX
+      .findAllMatchIn(envJavaOpts)
+      .map { matchResult =>
+        val key = matchResult.group(1)
+        val value = matchResult.group(2)
+        (key, value)
+      }
+      .toMap
+
+    val patternD = """-D([^\s]+)=([^\s]+)""".r
+    val keyValueMapD = patternD
+      .findAllMatchIn(envJavaOpts)
+      .map { matchResult =>
+        val key = matchResult.group(1)
+        val value = matchResult.group(2)
+        (key, value)
+      }
+      .toMap
+    val xloggcPattern = """-Xloggc:[^\s]+""".r
+    val xloggcValueStr1 = 
xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString
+    val xloggcValueStr2 = 
xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString
+    var escapedXloggcValue = ""
+    var replaceStr1 = ""
+    var replaceStr2 = ""
+    if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) {
+      escapedXloggcValue = xloggcValueStr2.replace("\\<", "<").replace("\\>", 
">")
+      replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, 
escapedXloggcValue)
+      replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "")
+    }
+    if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) {
+      escapedXloggcValue = xloggcValueStr1.replace("\\<", "<").replace("\\>", 
">")
+      replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, 
escapedXloggcValue)
+      replaceStr2 = envJavaOpts
+    }
+    if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) {
+      replaceStr1 = defaultJavaOpts
+      replaceStr2 = envJavaOpts
+    }
+    val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) =>
+      val (key, value) = entry
+      val oldValue = s"$key=[^\\s]+"
+      val newValue = key + "=" + value
+      result.replaceAll(oldValue, newValue)
+    }
+
+    val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) 
=>
+      val (key, value) = entry
+      val oldValue = s"$key=[^\\s]+"
+      val newValue = key + "=" + value
+      result.replaceAll(oldValue, newValue)
+    }
+    val javaOpts = (MergedStringD.split("\\s+") ++ 
replaceStr2.split("\\s+")).distinct.mkString(" ")
+    javaOpts
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to