This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b6da1f4d [KYUUBI #2333][KYUUBI #2554] Configuring Flink Engine heap
memory and java opts
6b6da1f4d is described below
commit 6b6da1f4d05b37450e4c6721b825a1e22abed298
Author: jiaoqingbo <[email protected]>
AuthorDate: Tue May 10 12:31:01 2022 +0800
[KYUUBI #2333][KYUUBI #2554] Configuring Flink Engine heap memory and java
opts
### _Why are the changes needed?_
fix #2554
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2579 from jiaoqingbo/kyuubi2554.
Closes #2333
Closes #2554
f0365c91 [jiaoqingbo] code review
1700aab9 [jiaoqingbo] code review
1ca10a65 [jiaoqingbo] fix ut failed
b53dcdd4 [jiaoqingbo] code review
f9ceb72c [jiaoqingbo] [KYUUBI #2554] Configuring Flink Engine heap memory
and java opts
Authored-by: jiaoqingbo <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
conf/kyuubi-env.sh.template | 2 ++
docs/deployment/engine_on_yarn.md | 23 +++++++++++++++++-
docs/deployment/settings.md | 5 ++++
.../it/flink/operation/FlinkOperationSuite.scala | 2 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 22 +++++++++++++++++
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 28 +++++++++++++++-------
.../engine/flink/FlinkProcessBuilderSuite.scala | 25 ++++++++++++-------
7 files changed, 88 insertions(+), 19 deletions(-)
diff --git a/conf/kyuubi-env.sh.template b/conf/kyuubi-env.sh.template
index ed2510dca..2b7be6fc8 100755
--- a/conf/kyuubi-env.sh.template
+++ b/conf/kyuubi-env.sh.template
@@ -46,6 +46,7 @@
# - FLINK_HOME Flink distribution which you would like to use in
Kyuubi.
# - FLINK_CONF_DIR Optional directory where the Flink configuration
lives.
# (Default: $FLINK_HOME/conf)
+# - FLINK_HADOOP_CLASSPATH Required Hadoop jars when you use the Kyuubi Flink
engine.
# - HIVE_HOME Hive distribution which you would like to use in
Kyuubi.
# - HIVE_CONF_DIR Optional directory where the Hive configuration
lives.
# (Default: $HIVE_HOME/conf)
@@ -59,6 +60,7 @@
# export SPARK_HOME=/opt/spark
# export FLINK_HOME=/opt/flink
# export HIVE_HOME=/opt/hive
+# export
FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar
# export
HIVE_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/common/lib/commons-collections-3.2.2.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.1.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
# export HADOOP_CONF_DIR=/usr/ndp/current/mapreduce_client/conf
# export YARN_CONF_DIR=/usr/ndp/current/yarn/conf
diff --git a/docs/deployment/engine_on_yarn.md
b/docs/deployment/engine_on_yarn.md
index 9c94bb6f6..e3c0c0ba1 100644
--- a/docs/deployment/engine_on_yarn.md
+++ b/docs/deployment/engine_on_yarn.md
@@ -169,12 +169,33 @@ export HADOOP_CLASSPATH=`hadoop classpath`
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
```
-If the `TopSpeedWindowing` passes, configure it in
`$KYUUBI_HOME/conf/kyuubi-env.sh` or `$FLINK_HOME/bin/config.sh`, e.g.
+If the `TopSpeedWindowing` passes, configure it in
`$KYUUBI_HOME/conf/kyuubi-env.sh`
```bash
$ echo "export HADOOP_CONF_DIR=/path/to/hadoop/conf" >>
$KYUUBI_HOME/conf/kyuubi-env.sh
```
+#### Required Environment Variable
+
+The `FLINK_HADOOP_CLASSPATH` is required, too.
+
+For users who are using Hadoop 3.x, Hadoop shaded client is recommended
instead of Hadoop vanilla jars.
+For users who are using Hadoop 2.x, `FLINK_HADOOP_CLASSPATH` should be set to
hadoop classpath to use Hadoop
+vanilla jars. For users which does not use Hadoop services, e.g. HDFS, YARN at
all, Hadoop client jars
+is also required, and recommend to use Hadoop shaded client as Hadoop 3.x's
users do.
+
+See [HADOOP-11656](https://issues.apache.org/jira/browse/HADOOP-11656) for
details of Hadoop shaded client.
+
+To use Hadoop shaded client, please configure $KYUUBI_HOME/conf/kyuubi-env.sh
as follows:
+
+```bash
+$ echo "export
FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar"
>> $KYUUBI_HOME/conf/kyuubi-env.sh
+```
+To use Hadoop vanilla jars, please configure $KYUUBI_HOME/conf/kyuubi-env.sh
as follows:
+
+```bash
+$ echo "export FLINK_HADOOP_CLASSPATH=`hadoop classpath`" >>
$KYUUBI_HOME/conf/kyuubi-env.sh
+```
### Deployment Modes Supported by Flink on YARN
For experiment use, we recommend deploying Kyuubi Flink SQL engine in [Session
Mode](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/yarn/#session-mode).
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index ddb3aa792..31533e681 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -81,6 +81,7 @@ You can configure the environment variables in
`$KYUUBI_HOME/conf/kyuubi-env.sh`
# - FLINK_HOME Flink distribution which you would like to use in
Kyuubi.
# - FLINK_CONF_DIR Optional directory where the Flink configuration
lives.
# (Default: $FLINK_HOME/conf)
+# - FLINK_HADOOP_CLASSPATH Required Hadoop jars when you use the Kyuubi Flink
engine.
# - HIVE_HOME Hive distribution which you would like to use in
Kyuubi.
# - HIVE_CONF_DIR Optional directory where the Hive configuration
lives.
# (Default: $HIVE_HOME/conf)
@@ -94,6 +95,7 @@ You can configure the environment variables in
`$KYUUBI_HOME/conf/kyuubi-env.sh`
# export SPARK_HOME=/opt/spark
# export FLINK_HOME=/opt/flink
# export HIVE_HOME=/opt/hive
+# export
FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar
# export
HIVE_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/common/lib/commons-collections-3.2.2.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.1.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
# export HADOOP_CONF_DIR=/usr/ndp/current/mapreduce_client/conf
# export YARN_CONF_DIR=/usr/ndp/current/yarn/conf
@@ -207,6 +209,9 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.engine.deregister.job.max.failures</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Number of failures of job
before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div
style='width: 20pt'>1.2.0</div>
<code>kyuubi.engine.event.json.log.path</code>|<div style='width:
65pt;word-wrap: break-word;white-space:
normal'>file:///tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The location of all the engine events go for
the builtin JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS
Path: start with 'hdfs://'</li></ul></div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
<code>kyuubi.engine.event.loggers</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>SPARK</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>A comma separated list of engine history
loggers, where engine/session/operation etc events go. We use spark logger by
default.<ul> <li>SPARK: the events will be written to the spark listener
bus.</li> <li>JSON: the events will be written to the location of
kyuubi.engine.event.json.log.path</li> <li>JDB [...]
+<code>kyuubi.engine.flink.extra.classpath</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra
classpath for the flink sql engine, for configuring location of hadoop client
jars, etc</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.6.0</div>
+<code>kyuubi.engine.flink.java.options</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra java
options for the flink sql engine</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
+<code>kyuubi.engine.flink.memory</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>1g</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The heap memory for the flink sql
engine</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.6.0</div>
<code>kyuubi.engine.hive.extra.classpath</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra
classpath for the hive query engine, for configuring location of hadoop client
jars, etc</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.6.0</div>
<code>kyuubi.engine.hive.java.options</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra java
options for the hive query engine</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.hive.memory</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>1g</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The heap memory for the hive query
engine</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.6.0</div>
diff --git
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index 4d2c8dfd7..be79e8e62 100644
---
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -40,7 +40,7 @@ class FlinkOperationSuite extends
WithKyuubiServerAndFlinkMiniCluster with HiveJ
.set(ENGINE_TYPE, "FLINK_SQL")
.set("flink.parallelism.default", "6")
.set(
- s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH",
+ s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_HADOOP_CLASSPATH",
s"$hadoopClasspath${File.separator}" +
s"hadoop-client-api-$HADOOP_COMPILE_VERSION.jar${File.pathSeparator}" +
s"$hadoopClasspath${File.separator}hadoop-client-runtime-$HADOOP_COMPILE_VERSION.jar")
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 23a947e78..49c6a649f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1437,6 +1437,28 @@ object KyuubiConf {
.stringConf
.createOptional
+ val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
+ buildConf("kyuubi.engine.flink.memory")
+ .doc("The heap memory for the flink sql engine")
+ .version("1.6.0")
+ .stringConf
+ .createWithDefault("1g")
+
+ val ENGINE_FLINK_JAVA_OPTIONS: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.flink.java.options")
+ .doc("The extra java options for the flink sql engine")
+ .version("1.6.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_FLINK_EXTRA_CLASSPATH: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.flink.extra.classpath")
+ .doc("The extra classpath for the flink sql engine, for configuring
location" +
+ " of hadoop client jars, etc")
+ .version("1.6.0")
+ .stringConf
+ .createOptional
+
val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.connections.per.user")
.doc("Maximum kyuubi server connections per user." +
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index c9b7e7f19..5a61cd9cf 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.operation.log.OperationLog
@@ -41,6 +42,8 @@ class FlinkProcessBuilder(
private val flinkHome: String = getEngineHome(shortName)
+ private val FLINK_HADOOP_CLASSPATH: String = "FLINK_HADOOP_CLASSPATH"
+
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String =
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@@ -49,9 +52,12 @@ class FlinkProcessBuilder(
val buffer = new ArrayBuffer[String]()
buffer += executable
- // TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or
kyuubi.engine.flink.memory to configure
- // -Xmx5g
- // java options
+ val memory = conf.get(ENGINE_FLINK_MEMORY)
+ buffer += s"-Xmx$memory"
+ val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
+ if (javaOptions.isDefined) {
+ buffer += javaOptions.get
+ }
buffer += "-cp"
val classpathEntries = new LinkedHashSet[String]
@@ -77,13 +83,17 @@ class FlinkProcessBuilder(
env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
- val hadoopClasspath = env.get("HADOOP_CLASSPATH")
- if (hadoopClasspath.isEmpty) {
- throw KyuubiSQLException("HADOOP_CLASSPATH is not set! " +
- "For more detail information on configuring HADOOP_CLASSPATH" +
-
"https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
+ val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH)
+ hadoopCp.foreach(classpathEntries.add)
+ val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+ extraCp.foreach(classpathEntries.add)
+ if (hadoopCp.isEmpty && extraCp.isEmpty) {
+ throw new KyuubiException(s"The conf of ${FLINK_HADOOP_CLASSPATH} and " +
+ s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty." +
+ s"Please set ${FLINK_HADOOP_CLASSPATH} or
${ENGINE_FLINK_EXTRA_CLASSPATH.key} for " +
+ s"configuring location of hadoop client jars, etc")
}
- classpathEntries.add(hadoopClasspath.get)
+
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer += mainClass
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 648a8f472..187af116b 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -22,26 +22,33 @@ import java.io.File
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
-import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite,
KyuubiSQLException, SCALA_COMPILE_VERSION}
+import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiException,
KyuubiFunSuite, SCALA_COMPILE_VERSION}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_EXTRA_CLASSPATH,
ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def conf = KyuubiConf().set("kyuubi.on", "off")
+ .set(ENGINE_FLINK_MEMORY, "512m")
+ .set(
+ ENGINE_FLINK_JAVA_OPTIONS,
+ "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
+
private def envDefault: ListMap[String, String] = ListMap(
- "JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
+ "JAVA_HOME" -> s"${File.separator}jdk")
private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
private def envWithAllHadoop: ListMap[String, String] =
envWithoutHadoopCLASSPATH +
- ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+ ("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
private def confStr: String = {
conf.getAll.map { case (k, v) => s"\\\n\t--conf $k=$v" }.mkString(" ")
}
private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
val actualCommands = builder.toString
val classpathStr: String = constructClasspathStr(builder)
- val expectedCommands = s"$javaPath " +
+ val expectedCommands = s"$javaPath -Xmx512m " +
+ s"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 " +
s"-cp $classpathStr $mainClassStr \\\n\t--conf
kyuubi.session.user=vinoyang " +
s"$confStr"
info(s"\n\n actualCommands $actualCommands")
@@ -71,6 +78,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
classpathEntries.add(v)
}
}
+ val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+ extraCp.foreach(classpathEntries.add)
val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
classpathStr
}
@@ -90,17 +99,17 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
compareActualAndExpected(builder)
}
- test("all hadoop related environment variables are configured except
HADOOP_CLASSPATH") {
+ test("all hadoop related environment variables are configured except
FLINK_HADOOP_CLASSPATH") {
val builder = new FlinkProcessBuilder("vinoyang", conf) {
override def env: Map[String, String] = envWithoutHadoopCLASSPATH
}
- assertThrows[KyuubiSQLException](builder.toString)
+ assertThrows[KyuubiException](builder.toString)
}
- test("only HADOOP_CLASSPATH environment variables are configured") {
+ test("only FLINK_HADOOP_CLASSPATH environment variables are configured") {
val builder = new FlinkProcessBuilder("vinoyang", conf) {
override def env: Map[String, String] = envDefault +
- ("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
+ ("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
}
compareActualAndExpected(builder)
}