[
https://issues.apache.org/jira/browse/FLINK-27130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Adrian Zhong resolved FLINK-27130.
----------------------------------
Resolution: Not A Problem
The best way to pass system properties for client is to set
env.java.opts.client in flink-conf.yaml,or set bash env "FLINK_ENV_JAVA_OPTS"
> unable to pass custom System properties through command line
> ------------------------------------------------------------
>
> Key: FLINK-27130
> URL: https://issues.apache.org/jira/browse/FLINK-27130
> Project: Flink
> Issue Type: Bug
> Components: Client / Job Submission
> Affects Versions: 1.13.0, 1.13.6
> Reporter: Adrian Zhong
> Priority: Major
>
> I'm using Flink YARN-PER-JOB mode to submit a job.
> I'm doubtful that the way for a job class to read system properties specified
> +through command line+ is unclear.
>
> I have searched all related issues, read unit tests for CliFrontend and
> DynamicProperties, however, this may be not a java problem.
>
> Here is my job class:
> {code:java}
> public static void main(String[] args) {
> String s =
> executionEnv.getConfiguration().get(CoreOptions.FLINK_CLI_JVM_OPTIONS);
> System.err.println("FLINK_CLI_JVM_OPTIONS" + s);
>
> String property = System.getProperty("kafka.start_from_timestamp");
> if (property == null) {
> //-Dkafka.start_from_timestamp=1648828800000
> System.err.println("-Dkafka.start_from_timestamp Not found");
> System.err.println("This are Properties Found in this JVM:");
> System.err.println(System.getProperties().stringPropertyNames());
> } else {
> System.err.println("-Dkafka.start_from_timestamp is" + property);
> } //....
> } {code}
> outputs:
> {code:java}
> FLINK_CLI_JVM_OPTIONS
> -Dkafka.start_from_timestamp Not found
> This are Properties Found in this JVM:
> [zookeeper.sasl.client, java.runtime.name, sun.boot.library.path,
> java.vm.version, java.vm.vendor, java.vendor.url, path.separator,
> java.vm.name, file.encoding.pkg, user.country, sun.java.launcher,
> sun.os.patch.level, java.vm.specification.name, user.dir,
> java.runtime.version, java.awt.graphicsenv, java.endorsed.dirs, os.arch,
> java.io.tmpdir, line.separator, java.vm.specification.vendor, os.name,
> log4j.configuration, sun.jnu.encoding, java.library.path,
> java.specification.name, java.class.version, sun.management.compiler,
> os.version, user.home, user.timezone, java.awt.printerjob, file.encoding,
> java.specification.version, log4j.configurationFile, user.name,
> java.class.path, log.file, java.vm.specification.version,
> sun.arch.data.model, java.home, sun.java.command, java.specification.vendor,
> user.language, awt.toolkit, java.vm.info, java.version, java.ext.dirs,
> sun.boot.class.path, java.vendor, logback.configurationFile,
> java.security.auth.login.config, file.separator, java.vendor.url.bug,
> sun.cpu.endian, sun.io.unicode.encoding, sun.cpu.isalist] {code}
> Environment:
> JDK: Oracle 1.8/25.121-b13
> Flink flink-1.13.0
>
> What I have tried:
>
> {code:java}
> -Denv.java.opts.client="-Dkafka.start_from_timestamp=1648828800000"
> -Denv.java.opts="-Dkafka.start_from_timestamp=1648828800001"
> -Dkafka.start_from_timestamp=1648828800002
> -yD env.java.opts="kafka.start_from_timestamp=1648828800003"
> -yD env.java.opts.client="kafka.start_from_timestamp=1648828800003" {code}
>
>
> submit command:
> {code:java}
> bin/flink run -yarnjobManagerMemory 1G --yarntaskManagerMemory 1G --yarnqueue
> root.users.appuser --yarnslots 1 --yarnname SocketWindowWordCount -m
> yarn-cluster --class com.slankka.learn.rtc.SocketWindowWordCount
> -Denv.java.opts="-Dkafka.start_from_timestamp=1648828800001"
> -Dkafka.start_from_timestamp=1648828800002 -yD
> env.java.opts="kafka.start_from_timestamp=1648828800003" -d
> /data/files_upload/socketWindowWordCount.jar -hostname 10.11.159.156 --port
> 7890 {code}
> Another approach:
> when I put JVM args into flink-conf.yaml, it works.
>
> I think the code in config.sh may indicates something:
> {code:java}
> if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
> FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS}
> "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
> # Remove leading and ending double quotes (if present) of value
> FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'
> -e 's/"$//' )"
> fi {code}
> The code above shows that config.sh is reading env.java.opts from
> flink-config.yaml, if both of FLINK_ENV_JAVA_OPTS and flink-config.yaml are
> not defined, the actual system properties are ignored.
>
> solution I have tried:
> {code:java}
> FLINK_ENV_JAVA_OPTS="-Dkafka.start_from_timestamp=1648828800009" bin/flink
> run .... -yD env.java.opts.client="-Dkafka.start_from_timestamp=1648828800000
> ... {code}
> output:
> {code:java}
> FLINK_CLI_JVM_OPTIONS-Dkafka.start_from_timestamp=1648828800000 //read from
> flink env configuration
> -Dkafka.start_from_timestamp is1648828800009 //read from system
> properties{code}
>
> source code in CoreOptions.java
> {code:java}
> public static final ConfigOption<String> FLINK_CLI_JVM_OPTIONS =
> ConfigOptions.key("env.java.opts.client")
> .stringType()
> .defaultValue("")
> .withDescription(
> Description.builder()
> .text("Java options to start the JVM of the
> Flink Client with.")
> .build()); {code}
> The ConfigOptions is different with java system properties.
>
> The way to specify system properties for job class, which is right and
> recommended?
> * through "-yD env.java.opts.client=yyyy" with "-m yarn-cluster" (as option
> works, as system properties does not work).
> * through "-yD env.java.opts=yyyy" with "-m yarn-cluster" (same as above).
> * through dynamic properties "-D xxx=yyy" (this may only supports flink
> pre-defined options).
> * through FLINK_CONF_DIR/flink-conf.yaml (sometime it is not agile for
> multi-users scenario).
> * through FLINK_ENV_JAVA_OPTS (not defined but tested in config.sh) or
> FLINK_ENV_JAVA_OPTS_CLI. (tested as above).
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)