This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ffebc647f [KYUUBI #5315][FLINK] Propagate HIVE_CONF_DIR on launching 
Flink engine
ffebc647f is described below

commit ffebc647f8ec338d614d2f48361643880f6235da
Author: kandy01.wang <[email protected]>
AuthorDate: Thu Sep 21 17:53:56 2023 +0800

    [KYUUBI #5315][FLINK] Propagate HIVE_CONF_DIR on launching Flink engine
    
    ### _Why are the changes needed?_
    
    create a Hive catalog in Flink SQL Client as follows:
    
    ```
    CREATE CATALOG myhive WITH (
      'type' = 'hive',
      'hive-conf-dir' = '/opt/hive-conf'
    )
    ```
    we should propagate `hive-conf-dir` and pass `HIVE_CONF_DIR` as an env 
variable to the AppMaster
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No.
    
    Closes #5316 from hadoopkandy/KYUUBI-5315.
    
    Closes #5315
    
    3f12acb59 [kandy01.wang] [KYUUBI #5315] [Improvement] Propagate 
HIVE_CONF_DIR on launching Flink engine
    
    Authored-by: kandy01.wang <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/kyuubi/engine/flink/FlinkProcessBuilder.scala  | 15 +++++++++++++++
 .../kyuubi/engine/flink/FlinkProcessBuilderSuite.scala    | 14 ++++++++++----
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 77b9cd6d3..f43adfbc2 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -104,6 +104,16 @@ class FlinkProcessBuilder(
         val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
         userJars.foreach(jars => flinkExtraJars ++= jars.split(","))
 
+        val hiveConfDirOpt = env.get("HIVE_CONF_DIR")
+        hiveConfDirOpt.foreach { hiveConfDir =>
+          val hiveConfFile = Paths.get(hiveConfDir).resolve("hive-site.xml")
+          if (!Files.exists(hiveConfFile)) {
+            throw new KyuubiException(s"The file $hiveConfFile does not 
exists. " +
+              s"Please put hive-site.xml when HIVE_CONF_DIR env $hiveConfDir 
is configured.")
+          }
+          flinkExtraJars += s"$hiveConfFile"
+        }
+
         buffer += "-t"
         buffer += "yarn-application"
         buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
@@ -111,6 +121,10 @@ class FlinkProcessBuilder(
         buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
         buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
 
+        hiveConfDirOpt.foreach { _ =>
+          buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
+        }
+
         val customFlinkConf = conf.getAllWithPrefix("flink", "")
         customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
           buffer += s"-D$k=$v"
@@ -166,6 +180,7 @@ class FlinkProcessBuilder(
         env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
         env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
         env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
+        env.get("HIVE_CONF_DIR").foreach(classpathEntries.add)
         val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
         hadoopCp.foreach(classpathEntries.add)
         val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 990d56f15..26e355a87 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -56,6 +56,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, 
"usrlib")).toFile
   private val tempUdfJar =
     Files.createFile(Paths.get(tempUsrLib.toPath.toString, "test-udf.jar"))
+  private val tempHiveDir =
+    Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, 
"hive-conf")).toFile
+  Files.createFile(Paths.get(tempHiveDir.toPath.toString, "hive-site.xml"))
 
   private def envDefault: ListMap[String, String] = ListMap(
     "JAVA_HOME" -> s"${File.separator}jdk",
@@ -63,7 +66,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
     ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
     ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
-    ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
+    ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf") +
+    ("HIVE_CONF_DIR" -> s"$tempHiveDir")
   private def envWithAllHadoop: ListMap[String, String] = 
envWithoutHadoopCLASSPATH +
     (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
   private def confStr: String = {
@@ -89,10 +93,12 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     val expectedCommands =
       escapePaths(s"${builder.flinkExecutable} run-application ") +
         s"-t yarn-application " +
-        
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar
 " +
+        
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar"
 +
+        s";.*\\/hive-site\\.xml " +
         s"-Dyarn\\.application\\.name=kyuubi_.* " +
         s"-Dyarn\\.tags=KYUUBI " +
         s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
+        s"-Dcontainerized\\.master\\.env\\.HIVE_CONF_DIR=\\. " +
         s"-Dexecution.target=yarn-application " +
         s"-c org\\.apache\\.kyuubi\\.engine\\.flink\\.FlinkSQLEngine " +
         s".*kyuubi-flink-sql-engine_.*jar" +
@@ -151,9 +157,9 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     matchActualAndExpectedSessionMode(builder)
   }
 
-  test("application mode - default env") {
+  test("application mode - all hadoop related environment variables are 
configured") {
     val builder = new FlinkProcessBuilder("paullam", applicationModeConf) {
-      override def env: Map[String, String] = envDefault
+      override def env: Map[String, String] = envWithAllHadoop
     }
     matchActualAndExpectedApplicationMode(builder)
   }

Reply via email to