This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6b04a50ae21 [FLINK-28114][python] Fix the issue that the Python client
interpreter could not point to an archive file in distributed file system
6b04a50ae21 is described below
commit 6b04a50ae2182d4cdd8e44ea9a16171d1d2394ce
Author: Dian Fu <[email protected]>
AuthorDate: Sun Jun 19 22:47:58 2022 +0800
[FLINK-28114][python] Fix the issue that the Python client interpreter
could not point to an archive file in distributed file system
---
docs/content.zh/docs/deployment/cli.md | 3 +++
docs/content/docs/deployment/cli.md | 3 +++
.../apache/flink/client/python/PythonEnvUtils.java | 27 +++++++++++++++++++---
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/docs/content.zh/docs/deployment/cli.md
b/docs/content.zh/docs/deployment/cli.md
index a4e5d3ea770..981ab7747c1 100644
--- a/docs/content.zh/docs/deployment/cli.md
+++ b/docs/content.zh/docs/deployment/cli.md
@@ -434,6 +434,9 @@ $ ./bin/flink run-application -t yarn-application \
<span class="label label-info">Note</span> As it executes the job on the
JobManager in YARN application mode, the paths specified in `-pyarch` and `-py`
are paths relative to `shipfiles` which is the directory name of the shipped
files.
+<span class="label label-info">Note</span> The archive files specified via
`-pyarch` will be distributed to the TaskManagers through blob server where the
file size limit is 2 GB.
+If the size of an archive file is more than 2 GB, you could upload it to a
distributed file system and then use the path in the command line option
`-pyarch`.
+
- Run a PyFlink application on a native Kubernetes cluster having the cluster
ID `<ClusterId>`, it requires a docker image with PyFlink installed, please
refer to [Enabling PyFlink in docker]({{< ref
"docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
```bash
$ ./bin/flink run-application \
diff --git a/docs/content/docs/deployment/cli.md
b/docs/content/docs/deployment/cli.md
index 8f81153d9e7..783b302bc6c 100644
--- a/docs/content/docs/deployment/cli.md
+++ b/docs/content/docs/deployment/cli.md
@@ -432,6 +432,9 @@ $ ./bin/flink run-application -t yarn-application \
<span class="label label-info">Note</span> As it executes the job on the
JobManager in YARN application mode, the paths specified in `-pyarch` and `-py`
are paths relative to `shipfiles` which is the directory name of the shipped
files.
+<span class="label label-info">Note</span> The archive files specified via
`-pyarch` will be distributed to the TaskManagers through blob server where the
file size limit is 2 GB.
+If the size of an archive file is more than 2 GB, you could upload it to a
distributed file system and then use the path in the command line option
`-pyarch`.
+
- Run a PyFlink application on a native Kubernetes cluster having the cluster
ID `<ClusterId>`, it requires a docker image with PyFlink installed, please
refer to [Enabling PyFlink in docker]({{< ref
"docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
```bash
$ ./bin/flink run-application \
diff --git
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 261aa82dc76..f018703dcdc 100644
---
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -171,12 +171,33 @@ final class PythonEnvUtils {
originalFileName =
archivePath.getName();
} else {
archivePath = new Path(archive);
- targetDirName = archivePath.getName();
- originalFileName = targetDirName;
+ originalFileName =
archivePath.getName();
+ targetDirName = originalFileName;
+ }
+
+ Path localArchivePath = archivePath;
+ try {
+ if
(archivePath.getFileSystem().isDistributedFS()) {
+ localArchivePath =
+ new Path(
+ env.tempDirectory,
+ String.join(
+
File.separator,
+
UUID.randomUUID().toString(),
+
originalFileName));
+ FileUtils.copy(archivePath,
localArchivePath, false);
+ }
+ } catch (IOException e) {
+ String msg =
+ String.format(
+ "Error occurred when
copying %s to %s.",
+ archivePath,
localArchivePath);
+ throw new RuntimeException(msg, e);
}
+
try {
CompressionUtils.extractFile(
- archivePath.getPath(),
+ localArchivePath.getPath(),
String.join(
File.separator,
env.archivesDirectory,