This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f8b0834fb83 [FLINK-31214][python] Add support for new command line
option -py.pythonpath
f8b0834fb83 is described below
commit f8b0834fb83b7d17ac9eeaf09f2f6cea070c978a
Author: Samrat002 <[email protected]>
AuthorDate: Mon Mar 20 18:23:58 2023 +0530
[FLINK-31214][python] Add support for new command line option -py.pythonpath
---
.../shortcodes/generated/python_configuration.html | 2 +-
.../apache/flink/client/cli/CliFrontendParser.java | 13 ++++++++++
.../flink/client/cli/ProgramOptionsUtils.java | 8 +++---
.../org/apache/flink/python/PythonOptions.java | 3 ++-
.../flink/python/util/PythonDependencyUtils.java | 9 +++++++
.../flink/client/cli/PythonProgramOptionsTest.java | 30 +++++++++++++++++-----
.../org/apache/flink/python/PythonOptionsTest.java | 14 ++++++++++
.../python/util/PythonDependencyUtilsTest.java | 12 +++++++++
.../flink/table/client/cli/CliOptionsParser.java | 2 ++
.../src/test/resources/cli/all-mode-help.out | 6 +++++
.../src/test/resources/cli/embedded-mode-help.out | 6 +++++
11 files changed, 93 insertions(+), 12 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html
b/docs/layouts/shortcodes/generated/python_configuration.html
index eb9c66795c8..f7027cc1e85 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -102,7 +102,7 @@
<td><h5>python.pythonpath</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Specify the path on the Worker Node where the Flink Python
Dependencies are installed, which gets added into the PYTHONPATH of the Python
Worker. </td>
+ <td>Specify the path on the Worker Node where the Flink Python
Dependencies are installed, which gets added into the PYTHONPATH of the Python
Worker. The option is equivalent to the command line option "-pypath".</td>
</tr>
<tr>
<td><h5>python.requirements</h5></td>
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index b3bdf10c7e0..837def6af5e 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -278,6 +278,15 @@ public class CliFrontendParser {
+ "process when submitting the Python jobs via
\"flink run\" or compiling "
+ "the Java/Scala jobs containing Python UDFs.");
+ public static final Option PYTHON_PATH =
+ new Option(
+ "pypath",
+ "pyPythonPath",
+ true,
+ "Specify the path of the python installation in worker
nodes."
+ + "(e.g.: --pyPythonPath
/python/lib64/python3.7/)."
+ + "User can specify multiple paths using the
separator \":\".");
+
static {
HELP_OPTION.setRequired(false);
@@ -344,6 +353,8 @@ public class CliFrontendParser {
PYEXEC_OPTION.setRequired(false);
PYCLIENTEXEC_OPTION.setRequired(false);
+
+ PYTHON_PATH.setRequired(false);
}
static final Options RUN_OPTIONS = getRunCommandOptions();
@@ -371,6 +382,7 @@ public class CliFrontendParser {
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(PYCLIENTEXEC_OPTION);
+ options.addOption(PYTHON_PATH);
return options;
}
@@ -387,6 +399,7 @@ public class CliFrontendParser {
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(PYCLIENTEXEC_OPTION);
+ options.addOption(PYTHON_PATH);
return options;
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
index 4296ad2c3e4..07b4ab8f8dd 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
@@ -41,6 +41,7 @@ import static
org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
import static
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYTHON_PATH;
import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
/** Utility class for {@link ProgramOptions}. */
@@ -61,15 +62,16 @@ public enum ProgramOptionsUtils {
}
/**
- * @return True if the commandline contains "-pyfs", "-pyarch", "-pyreq",
"-pyexec" options,
- * false otherwise.
+ * @return True if the commandline contains "-pyfs", "-pyarch", "-pyreq",
"-pyexec", "-pypath"
+ * options, false otherwise.
*/
public static boolean containsPythonDependencyOptions(CommandLine line) {
return line.hasOption(PYFILES_OPTION.getOpt())
|| line.hasOption(PYREQUIREMENTS_OPTION.getOpt())
|| line.hasOption(PYARCHIVE_OPTION.getOpt())
|| line.hasOption(PYEXEC_OPTION.getOpt())
- || line.hasOption(PYCLIENTEXEC_OPTION.getOpt());
+ || line.hasOption(PYCLIENTEXEC_OPTION.getOpt())
+ || line.hasOption(PYTHON_PATH.getOpt());
}
public static ProgramOptions createPythonProgramOptions(CommandLine line)
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 3a83fd76105..c40a436bfcf 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -131,7 +131,8 @@ public class PythonOptions {
Description.builder()
.text(
"Specify the path on the Worker
Node where the Flink Python Dependencies are installed, which "
- + "gets added into the
PYTHONPATH of the Python Worker. ")
+ + "gets added into the
PYTHONPATH of the Python Worker."
+ + " The option is
equivalent to the command line option \"-pypath\".")
.build());
public static final ConfigOption<String> PYTHON_ARCHIVES =
diff --git
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
index c93a6f49949..f5c56d13d8e 100644
---
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
@@ -44,6 +44,7 @@ import static
org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
import static
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYTHON_PATH;
import static
org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO;
import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE;
@@ -110,6 +111,9 @@ public class PythonDependencyUtils {
PythonOptions.PYTHON_CLIENT_EXECUTABLE,
commandLine.getOptionValue(PYCLIENTEXEC_OPTION.getOpt()));
}
+ if (commandLine.hasOption(PYTHON_PATH.getOpt())) {
+ config.set(PythonOptions.PYTHON_PATH,
commandLine.getOptionValue(PYTHON_PATH.getOpt()));
+ }
return config;
}
@@ -312,6 +316,11 @@ public class PythonDependencyUtils {
config.getOptional(PYTHON_CLIENT_EXECUTABLE)
.ifPresent(e ->
pythonDependencyConfig.set(PYTHON_CLIENT_EXECUTABLE, e));
+
+ config.getOptional(PythonOptions.PYTHON_PATH)
+ .ifPresent(
+ pyPath ->
+
pythonDependencyConfig.set(PythonOptions.PYTHON_PATH, pyPath));
}
private String generateUniqueFileKey(String prefix, String hashString)
{
diff --git
a/flink-python/src/test/java/org/apache/flink/client/cli/PythonProgramOptionsTest.java
b/flink-python/src/test/java/org/apache/flink/client/cli/PythonProgramOptionsTest.java
index e8cc69aa4d6..1b2cd4e5848 100644
---
a/flink-python/src/test/java/org/apache/flink/client/cli/PythonProgramOptionsTest.java
+++
b/flink-python/src/test/java/org/apache/flink/client/cli/PythonProgramOptionsTest.java
@@ -31,6 +31,7 @@ import static
org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
import static
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYTHON_PATH;
import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE;
import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS;
@@ -49,6 +50,7 @@ class PythonProgramOptionsTest {
options.addOption(PYREQUIREMENTS_OPTION);
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
+ options.addOption(PYTHON_PATH);
}
@Test
@@ -60,6 +62,7 @@ class PythonProgramOptionsTest {
"-pyreq", "a.txt#b_dir",
"-pyarch", "c.zip#venv,d.zip",
"-pyexec", "bin/python",
+ "-pypath", "bin/python/lib/:bin/python/lib64",
"userarg1", "userarg2"
};
@@ -72,6 +75,8 @@ class PythonProgramOptionsTest {
assertThat(config.get(PYTHON_REQUIREMENTS)).isEqualTo("a.txt#b_dir");
assertThat(config.get(PythonOptions.PYTHON_ARCHIVES)).isEqualTo("c.zip#venv,d.zip");
assertThat(config.get(PYTHON_EXECUTABLE)).isEqualTo("bin/python");
+ assertThat(config.get(PythonOptions.PYTHON_PATH))
+ .isEqualTo("bin/python/lib/:bin/python/lib64");
assertThat(programOptions.getProgramArgs())
.containsExactly(
"--python", "test.py", "--pyModule", "test",
"userarg1", "userarg2");
@@ -80,13 +85,22 @@ class PythonProgramOptionsTest {
@Test
void testCreateProgramOptionsWithLongOptions() throws CliArgsException {
String[] args = {
- "--python", "xxx.py",
- "--pyModule", "xxx",
- "--pyFiles", "/absolute/a.py,relative/b.py,relative/c.py",
- "--pyRequirements", "d.txt#e_dir",
- "--pyExecutable", "/usr/bin/python",
- "--pyArchives", "g.zip,h.zip#data,h.zip#data2",
- "userarg1", "userarg2"
+ "--python",
+ "xxx.py",
+ "--pyModule",
+ "xxx",
+ "--pyFiles",
+ "/absolute/a.py,relative/b.py,relative/c.py",
+ "--pyRequirements",
+ "d.txt#e_dir",
+ "--pyExecutable",
+ "/usr/bin/python",
+ "--pyArchives",
+ "g.zip,h.zip#data,h.zip#data2",
+ "--pyPythonPath",
+ "bin/python/lib/:bin/python/lib64",
+ "userarg1",
+ "userarg2"
};
CommandLine line = CliFrontendParser.parse(options, args, false);
PythonProgramOptions programOptions = (PythonProgramOptions)
ProgramOptions.create(line);
@@ -98,6 +112,8 @@ class PythonProgramOptionsTest {
assertThat(config.get(PythonOptions.PYTHON_ARCHIVES))
.isEqualTo("g.zip,h.zip#data,h.zip#data2");
assertThat(config.get(PYTHON_EXECUTABLE)).isEqualTo("/usr/bin/python");
+ assertThat(config.get(PythonOptions.PYTHON_PATH))
+ .isEqualTo("bin/python/lib/:bin/python/lib64");
assertThat(programOptions.getProgramArgs())
.containsExactly("--python", "xxx.py", "--pyModule", "xxx",
"userarg1", "userarg2");
}
diff --git
a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
index d02b93f852f..fc8e49f720d 100644
--- a/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/python/PythonOptionsTest.java
@@ -188,4 +188,18 @@ class PythonOptionsTest {
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
assertThat(actualIsSystemEnvEnabled).isEqualTo(expectedIsSystemEnvEnabled);
}
+
+ @Test
+ void testPythonPath() {
+ final Configuration configuration = new Configuration();
+ final Optional<String> defaultPythonPath =
+ configuration.getOptional(PythonOptions.PYTHON_PATH);
+ assertThat(defaultPythonPath).isEmpty();
+
+ final String expectedPythonPath = "venv/py37/bin/python";
+ configuration.set(PythonOptions.PYTHON_PATH, expectedPythonPath);
+
+ final String actualPythonPath =
configuration.get(PythonOptions.PYTHON_PATH);
+ assertThat(actualPythonPath).isEqualTo(expectedPythonPath);
+ }
}
diff --git
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
index 9d216a163f7..1b70729973b 100644
---
a/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
+++
b/flink-python/src/test/java/org/apache/flink/python/util/PythonDependencyUtilsTest.java
@@ -234,6 +234,18 @@ class PythonDependencyUtilsTest {
verifyConfiguration(expectedConfiguration, config);
}
+ @Test
+ void testPythonPath() {
+ String pyPath =
+
"venv/bin/python3/lib64/python3.7/site-packages/:venv/bin/python3/lib/python3.7/site-packages/";
+ Configuration config = new Configuration();
+ config.set(PythonOptions.PYTHON_PATH, pyPath);
+ Configuration actual = configurePythonDependencies(cachedFiles,
config);
+ Configuration expectedConfiguration = new Configuration();
+ expectedConfiguration.set(PythonOptions.PYTHON_PATH, pyPath);
+ verifyConfiguration(expectedConfiguration, actual);
+ }
+
private void verifyCachedFiles(Map<String, String> expected) {
Map<String, String> actual =
cachedFiles.stream().collect(Collectors.toMap(t -> t.f0, t ->
t.f1.filePath));
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index 597564d6a71..b620673c2de 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -44,6 +44,7 @@ import static
org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
import static
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYTHON_PATH;
/** Parser for command line options. */
public class CliOptionsParser {
@@ -183,6 +184,7 @@ public class CliOptionsParser {
options.addOption(PYARCHIVE_OPTION);
options.addOption(PYEXEC_OPTION);
options.addOption(PYCLIENTEXEC_OPTION);
+ options.addOption(PYTHON_PATH);
return options;
}
diff --git
a/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
b/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
index 4f17837fbe6..fbd865721dc 100644
--- a/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
+++ b/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
@@ -106,6 +106,12 @@ Mode "embedded" (default) submits Flink jobs from the
local machine.
file:///tmp/myresource.zip,hdfs:
///$namenode_address/myresource2
.zip).
+ -pypath,--pyPythonPath <arg> Specify the path of the python
+ installation in worker
+ nodes.(e.g.: --pyPythonPath
+ /python/lib64/python3.7/).User
+ can specify multiple paths
using
+ the separator ":".
-pyreq,--pyRequirements <arg> Specify a requirements.txt file
which defines the third-party
dependencies. These
dependencies
diff --git
a/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
b/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
index 4b69340e07f..dcea2fdcde9 100644
--- a/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
+++ b/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
@@ -103,6 +103,12 @@ Mode "embedded" (default) submits Flink jobs from the
local machine.
file:///tmp/myresource.zip,hdfs:
///$namenode_address/myresource2
.zip).
+ -pypath,--pyPythonPath <arg> Specify the path of the python
+ installation in worker
+ nodes.(e.g.: --pyPythonPath
+ /python/lib64/python3.7/).User
+ can specify multiple paths
using
+ the separator ":".
-pyreq,--pyRequirements <arg> Specify a requirements.txt file
which defines the third-party
dependencies. These
dependencies