xintongsong commented on a change in pull request #14538:
URL: https://github.com/apache/flink/pull/14538#discussion_r600140941
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
##########
@@ -260,8 +262,10 @@
.noDefaultValue()
.withDescription(
"A semicolon-separated list of archives to be
shipped to the YARN cluster."
- + " These archives will be un-packed when
localizing and they can be any of the following types: "
- + "\".tar.gz\", \".tar\", \".tgz\",
\".dst\", \".jar\", \".zip\".");
+ + " These archives can come from the local
client and remote file system,"
+ + " they will be un-packed when localizing
and they can be any of the following types: "
Review comment:
nit:
```suggestion
+ " These archives can come from the
local client and remote file system."
+ " They will be un-packed when
localizing and they can be any of the following types: "
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -183,13 +184,13 @@ public YarnClusterDescriptor(
this.nodeLabel =
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
}
- private Optional<List<File>> decodeFilesToShipToCluster(
+ private Optional<List<Path>> decodeFilesToShipToCluster(
final Configuration configuration, final
ConfigOption<List<String>> configOption) {
checkNotNull(configuration);
checkNotNull(configOption);
- final List<File> files =
- ConfigUtils.decodeListFromConfig(configuration, configOption,
File::new);
+ final List<Path> files =
+ ConfigUtils.decodeListFromConfig(configuration, configOption,
Path::new);
Review comment:
There are differences between creating a `Path` directly from a string
and creating a `Path` from uri of a `File` created from a string.
```
String s = "a/b/c";
System.out.println(new Path(s));
System.out.println(new File(s).toURI());
### Output ###
a/b/c
file:/path/to/current/dir/a/b/c
```
I haven't carefully check if these differences are causing problems. It at
least requires carefully dealing with the relative/absolute paths, and the
default schema depending on flink/hadoop configurations.
I'd suggest to ensure all the results of this method are with absolute path
and explicit schema. That should make the later process simpler.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1731,10 +1748,27 @@ ContainerLaunchContext setupApplicationMasterContainer(
return config.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
}
- private static boolean isUsrLibDirIncludedInShipFiles(List<File>
shipFiles) {
+ private static boolean isUsrLibDirIncludedInShipFiles(
Review comment:
Same as `isArchiveOnlyIncludedInShipArchiveFiles`.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -869,29 +886,25 @@ private ApplicationReport startAppMaster(
final List<String> systemClassPaths =
fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
- systemShipFiles.stream()
- .map(e -> new Path(e.toURI()))
- .collect(Collectors.toSet()),
+ systemShipFiles.stream().collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to
classpath.
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
- Set<File> shipOnlyFiles = new HashSet<>();
+ Set<Path> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
- shipOnlyFiles.stream()
- .map(e -> new Path(e.toURI()))
- .collect(Collectors.toSet()),
+ shipOnlyFiles.stream().collect(Collectors.toSet()),
Review comment:
```suggestion
new HashSet<>(shipOnlyFiles),
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -163,7 +164,15 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
addToRemotePaths(whetherToAddToRemotePaths, resourcePath);
if (Utils.isRemotePath(resourcePath.toString())) {
- final FileStatus fileStatus =
fileSystem.getFileStatus(resourcePath);
+ final FileSystem srcFs =
resourcePath.getFileSystem(fileSystem.getConf());
+ final FileStatus fileStatus;
+ if (srcFs.getScheme().equals(fileSystem.getScheme())) {
+ fileStatus = srcFs.getFileStatus(resourcePath);
+ } else {
+ Tuple2<Path, Long> remoteFileInfo =
+ uploadLocalFileToRemote(resourcePath, relativeDstPath,
false);
Review comment:
1. Why do we need to copy the file from another remote filesystem to the
default remote filesystem?
2. The method name `uploadLocalFileToRemote` is misleading. The file being
uploaded is not local.
3. `uploadLocalFileToRemote` internally uses
`o.a.hadoop.fs.FileUtil.copy()`. Looking into hadoop source codes, this util
method opens the src/dst files as `Input/OutputStream` and copy the bytes. That
means all the data go through the local client.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -286,18 +287,34 @@ public void addShipFiles(List<File> shipFiles) {
this.shipFiles.addAll(shipFiles);
}
- private void addShipArchives(List<File> shipArchives) {
+ public void addShipArchives(List<Path> shipArchives) {
checkArgument(
- isArchiveOnlyIncludedInShipArchiveFiles(shipArchives),
+ isArchiveOnlyIncludedInShipArchiveFiles(shipArchives,
yarnConfiguration),
"Non-archive files are included.");
this.shipArchives.addAll(shipArchives);
}
- private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<File>
shipFiles) {
+ private static boolean isArchiveOnlyIncludedInShipArchiveFiles(
+ List<Path> shipFiles, YarnConfiguration yarnConfig) {
return shipFiles.stream()
- .filter(File::isFile)
- .map(File::getName)
- .map(String::toLowerCase)
+ .map(
+ FunctionUtils.uncheckedFunction(
+ shipFile -> {
+ String fileName = null;
+ if
(Utils.isRemotePath(shipFile.toString())) {
+ final FileSystem fs =
shipFile.getFileSystem(yarnConfig);
+ if (fs.isFile(shipFile)) {
+ fileName =
shipFile.getName().toLowerCase();
+ }
+ } else {
+ final File localFile = new
File(shipFile.toUri().getPath());
+ if (localFile.isFile()) {
+ fileName =
localFile.getName().toLowerCase();
+ }
+ }
+ return fileName;
+ }))
+ .filter(name -> name != null)
Review comment:
1. We should not need the if-else branch, if all paths are guaranteed to
have a proper schema. For a local path, we should get a local file system from
`shipFile.getFileSystem()`, and the remaining handlings should be the same as
remote paths.
2. `Utils.isRemotePath()` internally uses
`o.a.flink.core.fs.FileSystem/Path` rather than
`o.a.hadoop.fs.FileSystem/Path`. Mixing of them could cause compatibility
issues.
3. It's very implicit that a null value of `name` indicates `isFile` returns
`false`. Instead, we can first map `Path` to `FileStatus`.
Then the method would look like the following, with better the readability.
```
private static boolean isArchiveOnlyIncludedInShipArchiveFiles(
List<Path> shipFiles, YarnConfiguration yarnConfig) {
return shipFiles.stream()
.map(FunctionUtils.uncheckedFunction(path ->
getFileStatus(path, yarnConfig)))
.filter(FileStatus::isFile)
.map(status -> status.getPath().getName().toLowerCase())
.allMatch(
name ->
name.endsWith(".tar.gz")
|| name.endsWith(".tar")
|| name.endsWith(".tgz")
|| name.endsWith(".dst")
|| name.endsWith(".jar")
|| name.endsWith(".zip"));
}
private static FileStatus getFileStatus(Path path, YarnConfiguration
yarnConfig)
throws IOException {
return path.getFileSystem(yarnConfig).getFileStatus(path);
}
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -869,29 +886,25 @@ private ApplicationReport startAppMaster(
final List<String> systemClassPaths =
fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
- systemShipFiles.stream()
- .map(e -> new Path(e.toURI()))
- .collect(Collectors.toSet()),
+ systemShipFiles.stream().collect(Collectors.toSet()),
Review comment:
```suggestion
new HashSet<>(systemShipFiles),
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -397,7 +422,12 @@ private Path copyToRemoteApplicationDir(
dst,
replicationFactor);
- fileSystem.copyFromLocalFile(false, true, localSrcPath, dst);
+ if (isLocal) {
+ fileSystem.copyFromLocalFile(false, true, localSrcPath, dst);
+ } else {
+ FileSystem srcFs =
localSrcPath.getFileSystem(fileSystem.getConf());
+ FileUtil.copy(srcFs, localSrcPath, fileSystem, dst, false,
fileSystem.getConf());
+ }
Review comment:
Same here, we should not `isLocal` and the if-else branches.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -191,13 +200,26 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
}
Tuple2<Path, Long> uploadLocalFileToRemote(
- final Path localSrcPath, final String relativeDstPath) throws
IOException {
+ final Path localSrcPath, final String relativeDstPath, final
boolean isLocal)
+ throws IOException {
- final File localFile = new File(localSrcPath.toUri().getPath());
- checkArgument(
- !localFile.isDirectory(), "File to copy cannot be a directory:
" + localSrcPath);
+ final Long lastModified;
+ if (isLocal) {
+ final File localFile = new File(localSrcPath.toUri().getPath());
+ checkArgument(
+ !localFile.isDirectory(),
+ "File to copy cannot be a directory: " + localSrcPath);
+ lastModified = localFile.lastModified();
+ } else {
+ final FileSystem srcFs =
localSrcPath.getFileSystem(fileSystem.getConf());
+ checkArgument(
+ !srcFs.isDirectory(localSrcPath),
+ "File to copy cannot be a directory: " + localSrcPath);
+ lastModified =
srcFs.getFileStatus(localSrcPath).getModificationTime();
+ }
Review comment:
Same here, we should not `isLocal` and the if-else branches.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -869,29 +886,25 @@ private ApplicationReport startAppMaster(
final List<String> systemClassPaths =
fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
- systemShipFiles.stream()
- .map(e -> new Path(e.toURI()))
- .collect(Collectors.toSet()),
+ systemShipFiles.stream().collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to
classpath.
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
- Set<File> shipOnlyFiles = new HashSet<>();
+ Set<Path> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
- shipOnlyFiles.stream()
- .map(e -> new Path(e.toURI()))
- .collect(Collectors.toSet()),
+ shipOnlyFiles.stream().collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
- shipArchives.stream().map(e -> new
Path(e.toURI())).collect(Collectors.toSet()),
+ shipArchives.stream().collect(Collectors.toSet()),
Review comment:
```suggestion
new HashSet<>(shipArchives),
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -795,15 +812,15 @@ private ApplicationReport startAppMaster(
getFileReplication());
// The files need to be shipped and added to classpath.
- Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
- for (File file : shipFiles) {
- systemShipFiles.add(file.getAbsoluteFile());
+ Set<Path> systemShipFiles = new HashSet<>(shipFiles.size());
+ for (Path file : shipFiles) {
+ systemShipFiles.add(file);
Review comment:
For-loop is no longer needed, if we initialize the set with `new
HashSet<>(shipFiles)`.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1665,9 +1678,13 @@ void addLibFoldersToShipFiles(Collection<File>
effectiveShipFiles) {
}
@VisibleForTesting
- void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {
+ void addPluginsFoldersToShipFiles(Collection<Path> effectiveShipFiles) {
final Optional<File> pluginsDir = PluginConfig.getPluginsDir();
- pluginsDir.ifPresent(effectiveShipFiles::add);
+ final Optional<Path> pluginsDirPath =
+ pluginsDir.isPresent()
+ ? Optional.of(new Path(pluginsDir.get().toURI()))
+ : Optional.empty();
+ pluginsDirPath.ifPresent(effectiveShipFiles::add);
Review comment:
```suggestion
pluginsDir.map(file -> new
Path(file.toURI())).ifPresent(effectiveShipFiles::add);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]