This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ffebc647f [KYUUBI #5315][FLINK] Propagate HIVE_CONF_DIR on launching
Flink engine
ffebc647f is described below
commit ffebc647f8ec338d614d2f48361643880f6235da
Author: kandy01.wang <[email protected]>
AuthorDate: Thu Sep 21 17:53:56 2023 +0800
[KYUUBI #5315][FLINK] Propagate HIVE_CONF_DIR on launching Flink engine
### _Why are the changes needed?_
create a Hive catalog in Flink SQL Client as follows:
```
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/hive-conf'
)
```
we should propagate `hive-conf-dir` and pass `HIVE_CONF_DIR` as an env
variable to the AppMaster
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5316 from hadoopkandy/KYUUBI-5315.
Closes #5315
3f12acb59 [kandy01.wang] [KYUUBI #5315] [Improvement] Propagate
HIVE_CONF_DIR on launching Flink engine
Authored-by: kandy01.wang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/kyuubi/engine/flink/FlinkProcessBuilder.scala | 15 +++++++++++++++
.../kyuubi/engine/flink/FlinkProcessBuilderSuite.scala | 14 ++++++++++----
2 files changed, 25 insertions(+), 4 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 77b9cd6d3..f43adfbc2 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -104,6 +104,16 @@ class FlinkProcessBuilder(
val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
userJars.foreach(jars => flinkExtraJars ++= jars.split(","))
+ val hiveConfDirOpt = env.get("HIVE_CONF_DIR")
+ hiveConfDirOpt.foreach { hiveConfDir =>
+ val hiveConfFile = Paths.get(hiveConfDir).resolve("hive-site.xml")
+ if (!Files.exists(hiveConfFile)) {
+ throw new KyuubiException(s"The file $hiveConfFile does not
exists. " +
+ s"Please put hive-site.xml when HIVE_CONF_DIR env $hiveConfDir
is configured.")
+ }
+ flinkExtraJars += s"$hiveConfFile"
+ }
+
buffer += "-t"
buffer += "yarn-application"
buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
@@ -111,6 +121,10 @@ class FlinkProcessBuilder(
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
+ hiveConfDirOpt.foreach { _ =>
+ buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
+ }
+
val customFlinkConf = conf.getAllWithPrefix("flink", "")
customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
buffer += s"-D$k=$v"
@@ -166,6 +180,7 @@ class FlinkProcessBuilder(
env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
+ env.get("HIVE_CONF_DIR").foreach(classpathEntries.add)
val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
hadoopCp.foreach(classpathEntries.add)
val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 990d56f15..26e355a87 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -56,6 +56,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString,
"usrlib")).toFile
private val tempUdfJar =
Files.createFile(Paths.get(tempUsrLib.toPath.toString, "test-udf.jar"))
+ private val tempHiveDir =
+ Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString,
"hive-conf")).toFile
+ Files.createFile(Paths.get(tempHiveDir.toPath.toString, "hive-site.xml"))
private def envDefault: ListMap[String, String] = ListMap(
"JAVA_HOME" -> s"${File.separator}jdk",
@@ -63,7 +66,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
- ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
+ ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf") +
+ ("HIVE_CONF_DIR" -> s"$tempHiveDir")
private def envWithAllHadoop: ListMap[String, String] =
envWithoutHadoopCLASSPATH +
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
@@ -89,10 +93,12 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
val expectedCommands =
escapePaths(s"${builder.flinkExecutable} run-application ") +
s"-t yarn-application " +
-
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar
" +
+
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar"
+
+ s";.*\\/hive-site\\.xml " +
s"-Dyarn\\.application\\.name=kyuubi_.* " +
s"-Dyarn\\.tags=KYUUBI " +
s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
+ s"-Dcontainerized\\.master\\.env\\.HIVE_CONF_DIR=\\. " +
s"-Dexecution.target=yarn-application " +
s"-c org\\.apache\\.kyuubi\\.engine\\.flink\\.FlinkSQLEngine " +
s".*kyuubi-flink-sql-engine_.*jar" +
@@ -151,9 +157,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
matchActualAndExpectedSessionMode(builder)
}
- test("application mode - default env") {
+ test("application mode - all hadoop related environment variables are
configured") {
val builder = new FlinkProcessBuilder("paullam", applicationModeConf) {
- override def env: Map[String, String] = envDefault
+ override def env: Map[String, String] = envWithAllHadoop
}
matchActualAndExpectedApplicationMode(builder)
}