This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6da1f4d [KYUUBI #2333][KYUUBI #2554] Configuring Flink Engine heap 
memory and java opts
6b6da1f4d is described below

commit 6b6da1f4d05b37450e4c6721b825a1e22abed298
Author: jiaoqingbo <[email protected]>
AuthorDate: Tue May 10 12:31:01 2022 +0800

    [KYUUBI #2333][KYUUBI #2554] Configuring Flink Engine heap memory and java 
opts
    
    ### _Why are the changes needed?_
    
    fix #2554
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2579 from jiaoqingbo/kyuubi2554.
    
    Closes #2333
    
    Closes #2554
    
    f0365c91 [jiaoqingbo] code review
    1700aab9 [jiaoqingbo] code review
    1ca10a65 [jiaoqingbo] fix ut failed
    b53dcdd4 [jiaoqingbo] code review
    f9ceb72c [jiaoqingbo] [KYUUBI #2554] Configuring Flink Engine heap memory 
and java opts
    
    Authored-by: jiaoqingbo <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 conf/kyuubi-env.sh.template                        |  2 ++
 docs/deployment/engine_on_yarn.md                  | 23 +++++++++++++++++-
 docs/deployment/settings.md                        |  5 ++++
 .../it/flink/operation/FlinkOperationSuite.scala   |  2 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 22 +++++++++++++++++
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  | 28 +++++++++++++++-------
 .../engine/flink/FlinkProcessBuilderSuite.scala    | 25 ++++++++++++-------
 7 files changed, 88 insertions(+), 19 deletions(-)

diff --git a/conf/kyuubi-env.sh.template b/conf/kyuubi-env.sh.template
index ed2510dca..2b7be6fc8 100755
--- a/conf/kyuubi-env.sh.template
+++ b/conf/kyuubi-env.sh.template
@@ -46,6 +46,7 @@
 # - FLINK_HOME              Flink distribution which you would like to use in 
Kyuubi.
 # - FLINK_CONF_DIR          Optional directory where the Flink configuration 
lives.
 #                           (Default: $FLINK_HOME/conf)
+# - FLINK_HADOOP_CLASSPATH  Required Hadoop jars when you use the Kyuubi Flink 
engine.
 # - HIVE_HOME               Hive distribution which you would like to use in 
Kyuubi.
 # - HIVE_CONF_DIR           Optional directory where the Hive configuration 
lives.
 #                           (Default: $HIVE_HOME/conf)
@@ -59,6 +60,7 @@
 # export SPARK_HOME=/opt/spark
 # export FLINK_HOME=/opt/flink
 # export HIVE_HOME=/opt/hive
+# export 
FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar
 # export 
HIVE_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/common/lib/commons-collections-3.2.2.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.1.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
 # export HADOOP_CONF_DIR=/usr/ndp/current/mapreduce_client/conf
 # export YARN_CONF_DIR=/usr/ndp/current/yarn/conf
diff --git a/docs/deployment/engine_on_yarn.md 
b/docs/deployment/engine_on_yarn.md
index 9c94bb6f6..e3c0c0ba1 100644
--- a/docs/deployment/engine_on_yarn.md
+++ b/docs/deployment/engine_on_yarn.md
@@ -169,12 +169,33 @@ export HADOOP_CLASSPATH=`hadoop classpath`
 echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
  ```
 
-If the `TopSpeedWindowing` passes, configure it in 
`$KYUUBI_HOME/conf/kyuubi-env.sh` or `$FLINK_HOME/bin/config.sh`, e.g.
+If the `TopSpeedWindowing` passes, configure it in 
`$KYUUBI_HOME/conf/kyuubi-env.sh`
 
 ```bash
 $ echo "export HADOOP_CONF_DIR=/path/to/hadoop/conf" >> 
$KYUUBI_HOME/conf/kyuubi-env.sh
 ```
 
+#### Required Environment Variable
+
+The `FLINK_HADOOP_CLASSPATH` is required, too.
+
+For users who are using Hadoop 3.x, Hadoop shaded client is recommended 
instead of Hadoop vanilla jars. 
+For users who are using Hadoop 2.x, `FLINK_HADOOP_CLASSPATH` should be set to 
hadoop classpath to use Hadoop 
+vanilla jars. For users which does not use Hadoop services, e.g. HDFS, YARN at 
all, Hadoop client jars 
+is also required, and recommend to use Hadoop shaded client as Hadoop 3.x's 
users do.
+
+See [HADOOP-11656](https://issues.apache.org/jira/browse/HADOOP-11656) for 
details of Hadoop shaded client.
+
+To use Hadoop shaded client, please configure $KYUUBI_HOME/conf/kyuubi-env.sh 
as follows:
+
+```bash
+$ echo "export 
FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar"
 >> $KYUUBI_HOME/conf/kyuubi-env.sh
+```
+To use Hadoop vanilla jars, please configure $KYUUBI_HOME/conf/kyuubi-env.sh 
as follows:
+
+```bash
+$ echo "export FLINK_HADOOP_CLASSPATH=`hadoop classpath`" >> 
$KYUUBI_HOME/conf/kyuubi-env.sh
+```
 ### Deployment Modes Supported by Flink on YARN
 
 For experiment use, we recommend deploying Kyuubi Flink SQL engine in [Session 
Mode](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/yarn/#session-mode).
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index ddb3aa792..31533e681 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -81,6 +81,7 @@ You can configure the environment variables in 
`$KYUUBI_HOME/conf/kyuubi-env.sh`
 # - FLINK_HOME              Flink distribution which you would like to use in 
Kyuubi.
 # - FLINK_CONF_DIR          Optional directory where the Flink configuration 
lives.
 #                           (Default: $FLINK_HOME/conf)
+# - FLINK_HADOOP_CLASSPATH  Required Hadoop jars when you use the Kyuubi Flink 
engine.
 # - HIVE_HOME               Hive distribution which you would like to use in 
Kyuubi.
 # - HIVE_CONF_DIR           Optional directory where the Hive configuration 
lives.
 #                           (Default: $HIVE_HOME/conf)
@@ -94,6 +95,7 @@ You can configure the environment variables in 
`$KYUUBI_HOME/conf/kyuubi-env.sh`
 # export SPARK_HOME=/opt/spark
 # export FLINK_HOME=/opt/flink
 # export HIVE_HOME=/opt/hive
+# export 
FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar
 # export 
HIVE_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/common/lib/commons-collections-3.2.2.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.1.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
 # export HADOOP_CONF_DIR=/usr/ndp/current/mapreduce_client/conf
 # export YARN_CONF_DIR=/usr/ndp/current/yarn/conf
@@ -207,6 +209,9 @@ Key | Default | Meaning | Type | Since
 <code>kyuubi.engine.deregister.job.max.failures</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Number of failures of job 
before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div 
style='width: 20pt'>1.2.0</div>
 <code>kyuubi.engine.event.json.log.path</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: 
normal'>file:///tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>The location of all the engine events go for 
the builtin JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS 
Path: start with 'hdfs://'</li></ul></div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
 <code>kyuubi.engine.event.loggers</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>SPARK</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>A comma separated list of engine history 
loggers, where engine/session/operation etc events go. We use spark logger by 
default.<ul> <li>SPARK: the events will be written to the spark listener 
bus.</li> <li>JSON: the events will be written to the location of 
kyuubi.engine.event.json.log.path</li> <li>JDB [...]
+<code>kyuubi.engine.flink.extra.classpath</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra 
classpath for the flink sql engine, for configuring location of hadoop client 
jars, etc</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.6.0</div>
+<code>kyuubi.engine.flink.java.options</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra java 
options for the flink sql engine</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
+<code>kyuubi.engine.flink.memory</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>1g</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>The heap memory for the flink sql 
engine</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.6.0</div>
 <code>kyuubi.engine.hive.extra.classpath</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra 
classpath for the hive query engine, for configuring location of hadoop client 
jars, etc</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.6.0</div>
 <code>kyuubi.engine.hive.java.options</code>|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra java 
options for the hive query engine</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
 <code>kyuubi.engine.hive.memory</code>|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>1g</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>The heap memory for the hive query 
engine</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.6.0</div>
diff --git 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index 4d2c8dfd7..be79e8e62 100644
--- 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -40,7 +40,7 @@ class FlinkOperationSuite extends 
WithKyuubiServerAndFlinkMiniCluster with HiveJ
     .set(ENGINE_TYPE, "FLINK_SQL")
     .set("flink.parallelism.default", "6")
     .set(
-      s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH",
+      s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_HADOOP_CLASSPATH",
       s"$hadoopClasspath${File.separator}" +
         s"hadoop-client-api-$HADOOP_COMPILE_VERSION.jar${File.pathSeparator}" +
         
s"$hadoopClasspath${File.separator}hadoop-client-runtime-$HADOOP_COMPILE_VERSION.jar")
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 23a947e78..49c6a649f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1437,6 +1437,28 @@ object KyuubiConf {
       .stringConf
       .createOptional
 
+  val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
+    buildConf("kyuubi.engine.flink.memory")
+      .doc("The heap memory for the flink sql engine")
+      .version("1.6.0")
+      .stringConf
+      .createWithDefault("1g")
+
+  val ENGINE_FLINK_JAVA_OPTIONS: OptionalConfigEntry[String] =
+    buildConf("kyuubi.engine.flink.java.options")
+      .doc("The extra java options for the flink sql engine")
+      .version("1.6.0")
+      .stringConf
+      .createOptional
+
+  val ENGINE_FLINK_EXTRA_CLASSPATH: OptionalConfigEntry[String] =
+    buildConf("kyuubi.engine.flink.extra.classpath")
+      .doc("The extra classpath for the flink sql engine, for configuring 
location" +
+        " of hadoop client jars, etc")
+      .version("1.6.0")
+      .stringConf
+      .createOptional
+
   val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
     buildConf("kyuubi.server.limit.connections.per.user")
       .doc("Maximum kyuubi server connections per user." +
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index c9b7e7f19..5a61cd9cf 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
 import org.apache.kyuubi.engine.ProcBuilder
 import org.apache.kyuubi.operation.log.OperationLog
@@ -41,6 +42,8 @@ class FlinkProcessBuilder(
 
   private val flinkHome: String = getEngineHome(shortName)
 
+  private val FLINK_HADOOP_CLASSPATH: String = "FLINK_HADOOP_CLASSPATH"
+
   override protected def module: String = "kyuubi-flink-sql-engine"
 
   override protected def mainClass: String = 
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@@ -49,9 +52,12 @@ class FlinkProcessBuilder(
     val buffer = new ArrayBuffer[String]()
     buffer += executable
 
-    // TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or 
kyuubi.engine.flink.memory to configure
-    // -Xmx5g
-    // java options
+    val memory = conf.get(ENGINE_FLINK_MEMORY)
+    buffer += s"-Xmx$memory"
+    val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
+    if (javaOptions.isDefined) {
+      buffer += javaOptions.get
+    }
 
     buffer += "-cp"
     val classpathEntries = new LinkedHashSet[String]
@@ -77,13 +83,17 @@ class FlinkProcessBuilder(
     env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
     env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
     env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
-    val hadoopClasspath = env.get("HADOOP_CLASSPATH")
-    if (hadoopClasspath.isEmpty) {
-      throw KyuubiSQLException("HADOOP_CLASSPATH is not set! " +
-        "For more detail information on configuring HADOOP_CLASSPATH" +
-        
"https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments";)
+    val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH)
+    hadoopCp.foreach(classpathEntries.add)
+    val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+    extraCp.foreach(classpathEntries.add)
+    if (hadoopCp.isEmpty && extraCp.isEmpty) {
+      throw new KyuubiException(s"The conf of ${FLINK_HADOOP_CLASSPATH} and " +
+        s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty." +
+        s"Please set ${FLINK_HADOOP_CLASSPATH} or 
${ENGINE_FLINK_EXTRA_CLASSPATH.key} for " +
+        s"configuring location of hadoop client jars, etc")
     }
-    classpathEntries.add(hadoopClasspath.get)
+
     buffer += classpathEntries.asScala.mkString(File.pathSeparator)
     buffer += mainClass
 
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 648a8f472..187af116b 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -22,26 +22,33 @@ import java.io.File
 import scala.collection.JavaConverters._
 import scala.collection.immutable.ListMap
 
-import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite, 
KyuubiSQLException, SCALA_COMPILE_VERSION}
+import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiException, 
KyuubiFunSuite, SCALA_COMPILE_VERSION}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_EXTRA_CLASSPATH, 
ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
 
 class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private def conf = KyuubiConf().set("kyuubi.on", "off")
+    .set(ENGINE_FLINK_MEMORY, "512m")
+    .set(
+      ENGINE_FLINK_JAVA_OPTIONS,
+      "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
+
   private def envDefault: ListMap[String, String] = ListMap(
-    "JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
+    "JAVA_HOME" -> s"${File.separator}jdk")
   private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
     ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
     ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
     ("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
   private def envWithAllHadoop: ListMap[String, String] = 
envWithoutHadoopCLASSPATH +
-    ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+    ("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
   private def confStr: String = {
     conf.getAll.map { case (k, v) => s"\\\n\t--conf $k=$v" }.mkString(" ")
   }
   private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
     val actualCommands = builder.toString
     val classpathStr: String = constructClasspathStr(builder)
-    val expectedCommands = s"$javaPath " +
+    val expectedCommands = s"$javaPath -Xmx512m " +
+      s"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 " +
       s"-cp $classpathStr $mainClassStr \\\n\t--conf 
kyuubi.session.user=vinoyang " +
       s"$confStr"
     info(s"\n\n actualCommands $actualCommands")
@@ -71,6 +78,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
         classpathEntries.add(v)
       }
     }
+    val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+    extraCp.foreach(classpathEntries.add)
     val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
     classpathStr
   }
@@ -90,17 +99,17 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     compareActualAndExpected(builder)
   }
 
-  test("all hadoop related environment variables are configured except 
HADOOP_CLASSPATH") {
+  test("all hadoop related environment variables are configured except 
FLINK_HADOOP_CLASSPATH") {
     val builder = new FlinkProcessBuilder("vinoyang", conf) {
       override def env: Map[String, String] = envWithoutHadoopCLASSPATH
     }
-    assertThrows[KyuubiSQLException](builder.toString)
+    assertThrows[KyuubiException](builder.toString)
   }
 
-  test("only HADOOP_CLASSPATH environment variables are configured") {
+  test("only FLINK_HADOOP_CLASSPATH environment variables are configured") {
     val builder = new FlinkProcessBuilder("vinoyang", conf) {
       override def env: Map[String, String] = envDefault +
-        ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+        ("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
     }
     compareActualAndExpected(builder)
   }

Reply via email to