This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch typesafe in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit bc62552b57f289876f185740e5582ef7672f944b Author: benjobs <[email protected]> AuthorDate: Sat Nov 5 23:27:34 2022 +0800 [feature] HOCON conf file support --- pom.xml | 9 ++- streampark-common/pom.xml | 5 ++ .../streampark/common/util/PropertiesUtils.scala | 66 +++++++++++++--------- .../console/core/entity/ApplicationConfig.java | 21 ++++++- .../core/service/impl/ApplicationServiceImpl.java | 3 +- .../flink/core/FlinkStreamingInitializer.scala | 19 ++++--- .../flink/submit/bean/SubmitRequest.scala | 11 ++-- 7 files changed, 88 insertions(+), 46 deletions(-) diff --git a/pom.xml b/pom.xml index eca63cfda..83349b842 100644 --- a/pom.xml +++ b/pom.xml @@ -114,6 +114,7 @@ <mysql.version>8.0.16</mysql.version> <hikariCP.version>3.4.5</hikariCP.version> <snakeyaml.version>1.32</snakeyaml.version> + <typesafe-conf.version>1.4.2</typesafe-conf.version> <json4s-jackson.version>3.7.0-M2</json4s-jackson.version> <hbase-client.version>1.3.5</hbase-client.version> <commons-cli.version>1.3.1</commons-cli.version> @@ -229,6 +230,12 @@ <version>${hikariCP.version}</version> </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <version>${typesafe-conf.version}</version> + </dependency> + <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> @@ -572,7 +579,7 @@ <version>${jupiter.version}</version> <scope>test</scope> </dependency> - + <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-params</artifactId> diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml index 2b4163d36..43b12511b 100644 --- a/streampark-common/pom.xml +++ b/streampark-common/pom.xml @@ -80,6 +80,11 @@ <artifactId>snakeyaml</artifactId> </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + </dependency> + <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_${scala.binary.version}</artifactId> diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index f590bc848..2c0142ace 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -16,6 +16,7 @@ */ package org.apache.streampark.common.util +import com.typesafe.config.ConfigFactory import org.yaml.snakeyaml.Yaml import java.io._ @@ -76,6 +77,16 @@ object PropertiesUtils extends Logger { } } + def fromHoconText(conf: String): Map[String, String] = { + try { + val reader = new StringReader(conf) + val config = ConfigFactory.parseReader(reader) + config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap + } catch { + case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e) + } + } + def fromPropertiesText(conf: String): Map[String, String] = { try { val properties = new Properties() @@ -92,17 +103,14 @@ object PropertiesUtils extends Logger { require(file.exists(), s"[StreamPark] fromYamlFile: Yaml file $file does not exist") require(file.isFile, s"[StreamPark] fromYamlFile: Yaml file $file is not a normal file") val inputStream: InputStream = new FileInputStream(file) - try { - val map = MutableMap[String, String]() - new Yaml() - .load(inputStream) - .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap - } catch { - case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e) - } finally { - inputStream.close() - } + fromYamlFile(inputStream) + } + + def fromHoconFile(filename: String): Map[String, String] = { + val file = new File(filename) + require(file.exists(), s"[StreamPark] fromHoconFile: file $file does not exist") + val inputStream = new FileInputStream(file) + fromHoconFile(inputStream) } /** Load properties present in the given file. */ @@ -110,17 +118,8 @@ object PropertiesUtils extends Logger { val file = new File(filename) require(file.exists(), s"[StreamPark] fromPropertiesFile: Properties file $file does not exist") require(file.isFile, s"[StreamPark] fromPropertiesFile: Properties file $file is not a normal file") - - val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8") - try { - val properties = new Properties() - properties.load(inReader) - properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap - } catch { - case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e) - } finally { - inReader.close() - } + val inputStream = new FileInputStream(file) + fromPropertiesFile(inputStream) } /** Load Yaml present in the given file. */ @@ -139,6 +138,17 @@ object PropertiesUtils extends Logger { } } + def fromHoconFile(inputStream: InputStream): Map[String, String] = { + require(inputStream != null, s"[StreamPark] fromHoconFile: Hocon inputStream must not be null") + try { + val reader = new InputStreamReader(inputStream) + val config = ConfigFactory.parseReader(reader) + config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap + } catch { + case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e) + } + } + /** Load properties present in the given file. */ def fromPropertiesFile(inputStream: InputStream): Map[String, String] = { require(inputStream != null, s"[StreamPark] fromPropertiesFile: Properties inputStream must not be null") @@ -153,18 +163,20 @@ object PropertiesUtils extends Logger { def fromYamlTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlText(text).asJava) - def fromPropertiesTextAsJava(conf: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(conf).asJava) + def fromHoconTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconText(text).asJava) + + def fromPropertiesTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(text).asJava) - /** Load Yaml present in the given file. */ def fromYamlFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(filename).asJava) - /** Load properties present in the given file. */ + def fromHoconFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(filename).asJava) + def fromPropertiesFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(filename).asJava) - /** Load Yaml present in the given file. */ def fromYamlFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(inputStream).asJava) - /** Load properties present in the given file. */ + def fromHoconFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(inputStream).asJava) + def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(inputStream).asJava) /** diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java index 4a50cae9f..db660043f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java @@ -44,7 +44,8 @@ public class ApplicationConfig { /** * 1)yaml <br> - * 2)prop + * 2)prop <br> + * 3)hocon */ private Integer format; @@ -74,14 +75,30 @@ public class ApplicationConfig { } public Map<String, String> readConfig() { - switch (this.getFormat()) { + switch (this.format) { case 1: return PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content)); case 2: return PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content)); + case 3: + return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content)); default: break; } return null; } + + public String configType() { + switch (this.format) { + case 1: + return "yaml"; + case 2: + return "prop"; + case 3: + return "conf"; + default: + throw new IllegalArgumentException("getConfigType error, format must be (1|2|3), detail: 1:yaml, 2:properties, 3:hocon"); + } + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 1b2996966..47e86973a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1262,8 +1262,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli } else { switch (application.getApplicationType()) { case STREAMPARK_FLINK: - String format = applicationConfig.getFormat() == 1 ? "yaml" : "prop"; - appConf = String.format("%s://%s", format, applicationConfig.getContent()); + appConf = String.format("%s://%s", applicationConfig.configType(), applicationConfig.getContent()); break; case APACHE_FLINK: appConf = String.format("json://{\"%s\":\"%s\"}", ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 8c84b3719..8189a034c 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -102,29 +102,30 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api def parseConfig(config: String): Map[String, String] = { val extension = config.split("\\.").last.toLowerCase + lazy val content = DeflaterUtils.unzipString(config.drop(7)) val map = config match { - case x if x.startsWith("yaml://") => - PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7))) - case x if x.startsWith("prop://") => - PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7))) + case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content) + case x if x.startsWith("conf://") => PropertiesUtils.fromHoconText(content) + case x if x.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content) case x if x.startsWith("hdfs://") => - /** * If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir */ val text = HdfsUtils.read(x) extension match { - case "properties" => PropertiesUtils.fromPropertiesText(text) case "yml" | "yaml" => PropertiesUtils.fromYamlText(text) + case "conf" => PropertiesUtils.fromHoconText(text) + case "properties" => PropertiesUtils.fromPropertiesText(text) case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml") } case _ => val configFile = new File(config) - require(configFile.exists(), s"[StreamPark] Usage:flink.conf file $configFile is not found!!!") + require(configFile.exists(), s"[StreamPark] Usage: flink.conf file $configFile is not found!!!") extension match { - case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath) case "yml" | "yaml" => PropertiesUtils.fromYamlFile(configFile.getAbsolutePath) - case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml") + case "conf" => PropertiesUtils.fromHoconFile(configFile.getAbsolutePath) + case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath) + case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be (yml|conf|properties)") } } map.filter(_._2.nonEmpty) diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala index d25311146..601fc8ade 100644 --- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala @@ -104,11 +104,11 @@ case class SubmitRequest(flinkVersion: FlinkVersion, private[this] def getParameterMap(prefix: String = ""): Map[String, String] = { if (this.appConf == null) Map.empty[String, String] else { + lazy val content = DeflaterUtils.unzipString(this.appConf.trim.drop(7)) val map = this.appConf match { - case x if x.trim.startsWith("yaml://") => - PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.trim.drop(7))) - case x if x.trim.startsWith("prop://") => - PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.trim.drop(7))) + case x if x.trim.startsWith("yaml://") => PropertiesUtils.fromYamlText(content) + case x if x.trim.startsWith("conf://") => PropertiesUtils.fromHoconText(content) + case x if x.trim.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content) case x if x.trim.startsWith("hdfs://") => /* * 如果配置文件为hdfs方式,则需要用户将hdfs相关配置文件copy到resources下... @@ -116,8 +116,9 @@ case class SubmitRequest(flinkVersion: FlinkVersion, val text = HdfsUtils.read(this.appConf) val extension = this.appConf.split("\\.").last.toLowerCase extension match { - case "properties" => PropertiesUtils.fromPropertiesText(text) case "yml" | "yaml" => PropertiesUtils.fromYamlText(text) + case "conf" => PropertiesUtils.fromHoconText(text) + case "properties" => PropertiesUtils.fromPropertiesText(text) case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml") } case x if x.trim.startsWith("json://") =>
