xintongsong commented on a change in pull request #14538:
URL: https://github.com/apache/flink/pull/14538#discussion_r600140941



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
##########
@@ -260,8 +262,10 @@
                     .noDefaultValue()
                     .withDescription(
                             "A semicolon-separated list of archives to be 
shipped to the YARN cluster."
-                                    + " These archives will be un-packed when 
localizing and they can be any of the following types: "
-                                    + "\".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\".");
+                                    + " These archives can come from the local 
client and remote file system,"
+                                    + " they will be un-packed when localizing 
and they can be any of the following types: "

Review comment:
       nit:
   ```suggestion
                                       + " These archives can come from the 
local client and remote file system."
                                       + " They will be un-packed when 
localizing and they can be any of the following types: "
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -183,13 +184,13 @@ public YarnClusterDescriptor(
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
     }
 
-    private Optional<List<File>> decodeFilesToShipToCluster(
+    private Optional<List<Path>> decodeFilesToShipToCluster(
             final Configuration configuration, final 
ConfigOption<List<String>> configOption) {
         checkNotNull(configuration);
         checkNotNull(configOption);
 
-        final List<File> files =
-                ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+        final List<Path> files =
+                ConfigUtils.decodeListFromConfig(configuration, configOption, 
Path::new);

Review comment:
       There are differences between creating a `Path` directly from a string 
and creating a `Path` from uri of a `File` created from a string.
   ```
   String s = "a/b/c";
   System.out.println(new Path(s));
   System.out.println(new File(s).toURI());
   
   ### Output ###
   a/b/c
   file:/path/to/current/dir/a/b/c
   ```
   
   I haven't carefully check if these differences are causing problems. It at 
least requires carefully dealing with the relative/absolute paths, and the 
default schema depending on flink/hadoop configurations.
   
   I'd suggest to ensure all the results of this method are with absolute path 
and explicit schema. That should make the later process simpler.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1731,10 +1748,27 @@ ContainerLaunchContext setupApplicationMasterContainer(
         return config.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
     }
 
-    private static boolean isUsrLibDirIncludedInShipFiles(List<File> 
shipFiles) {
+    private static boolean isUsrLibDirIncludedInShipFiles(

Review comment:
       Same as `isArchiveOnlyIncludedInShipArchiveFiles`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -869,29 +886,25 @@ private ApplicationReport startAppMaster(
         final List<String> systemClassPaths = 
fileUploader.registerProvidedLocalResources();
         final List<String> uploadedDependencies =
                 fileUploader.registerMultipleLocalResources(
-                        systemShipFiles.stream()
-                                .map(e -> new Path(e.toURI()))
-                                .collect(Collectors.toSet()),
+                        systemShipFiles.stream().collect(Collectors.toSet()),
                         Path.CUR_DIR,
                         LocalResourceType.FILE);
         systemClassPaths.addAll(uploadedDependencies);
 
         // upload and register ship-only files
         // Plugin files only need to be shipped and should not be added to 
classpath.
         if (providedLibDirs == null || providedLibDirs.isEmpty()) {
-            Set<File> shipOnlyFiles = new HashSet<>();
+            Set<Path> shipOnlyFiles = new HashSet<>();
             addPluginsFoldersToShipFiles(shipOnlyFiles);
             fileUploader.registerMultipleLocalResources(
-                    shipOnlyFiles.stream()
-                            .map(e -> new Path(e.toURI()))
-                            .collect(Collectors.toSet()),
+                    shipOnlyFiles.stream().collect(Collectors.toSet()),

Review comment:
       ```suggestion
                       new HashSet<>(shipOnlyFiles),
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -163,7 +164,15 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
         addToRemotePaths(whetherToAddToRemotePaths, resourcePath);
 
         if (Utils.isRemotePath(resourcePath.toString())) {
-            final FileStatus fileStatus = 
fileSystem.getFileStatus(resourcePath);
+            final FileSystem srcFs = 
resourcePath.getFileSystem(fileSystem.getConf());
+            final FileStatus fileStatus;
+            if (srcFs.getScheme().equals(fileSystem.getScheme())) {
+                fileStatus = srcFs.getFileStatus(resourcePath);
+            } else {
+                Tuple2<Path, Long> remoteFileInfo =
+                        uploadLocalFileToRemote(resourcePath, relativeDstPath, 
false);

Review comment:
       1. Why do we need to copy the file from another remote filesystem to the 
default remote filesystem?
   2. The method name `uploadLocalFileToRemote` is misleading. The file being 
uploaded is not local.
   3. `uploadLocalFileToRemote` internally uses 
`o.a.hadoop.fs.FileUtil.copy()`. Looking into hadoop source codes, this util 
method opens the src/dst files as `Input/OutputStream` and copy the bytes. That 
means all the data go through the local client. 

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -286,18 +287,34 @@ public void addShipFiles(List<File> shipFiles) {
         this.shipFiles.addAll(shipFiles);
     }
 
-    private void addShipArchives(List<File> shipArchives) {
+    public void addShipArchives(List<Path> shipArchives) {
         checkArgument(
-                isArchiveOnlyIncludedInShipArchiveFiles(shipArchives),
+                isArchiveOnlyIncludedInShipArchiveFiles(shipArchives, 
yarnConfiguration),
                 "Non-archive files are included.");
         this.shipArchives.addAll(shipArchives);
     }
 
-    private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<File> 
shipFiles) {
+    private static boolean isArchiveOnlyIncludedInShipArchiveFiles(
+            List<Path> shipFiles, YarnConfiguration yarnConfig) {
         return shipFiles.stream()
-                .filter(File::isFile)
-                .map(File::getName)
-                .map(String::toLowerCase)
+                .map(
+                        FunctionUtils.uncheckedFunction(
+                                shipFile -> {
+                                    String fileName = null;
+                                    if 
(Utils.isRemotePath(shipFile.toString())) {
+                                        final FileSystem fs = 
shipFile.getFileSystem(yarnConfig);
+                                        if (fs.isFile(shipFile)) {
+                                            fileName = 
shipFile.getName().toLowerCase();
+                                        }
+                                    } else {
+                                        final File localFile = new 
File(shipFile.toUri().getPath());
+                                        if (localFile.isFile()) {
+                                            fileName = 
localFile.getName().toLowerCase();
+                                        }
+                                    }
+                                    return fileName;
+                                }))
+                .filter(name -> name != null)

Review comment:
       1. We should not need the if-else branch, if all paths are guaranteed to 
have a proper schema. For a local path, we should get a local file system from 
`shipFile.getFileSystem()`, and the remaining handlings should be the same as 
remote paths.
   2. `Utils.isRemotePath()` internally uses 
`o.a.flink.core.fs.FileSystem/Path` rather than 
`o.a.hadoop.fs.FileSystem/Path`. Mixing of them could cause compatibility 
issues. 
   3. It's very implicit that a null value of `name` indicates `isFile` returns 
`false`. Instead, we can first map `Path` to `FileStatus`.
   
   Then the method would look like the following, with better the readability.
   ```
       private static boolean isArchiveOnlyIncludedInShipArchiveFiles(
               List<Path> shipFiles, YarnConfiguration yarnConfig) {
           return shipFiles.stream()
                   .map(FunctionUtils.uncheckedFunction(path -> 
getFileStatus(path, yarnConfig)))
                   .filter(FileStatus::isFile)
                   .map(status -> status.getPath().getName().toLowerCase())
                   .allMatch(
                           name ->
                                   name.endsWith(".tar.gz")
                                           || name.endsWith(".tar")
                                           || name.endsWith(".tgz")
                                           || name.endsWith(".dst")
                                           || name.endsWith(".jar")
                                           || name.endsWith(".zip"));
       }
   
       private static FileStatus getFileStatus(Path path, YarnConfiguration 
yarnConfig)
               throws IOException {
           return path.getFileSystem(yarnConfig).getFileStatus(path);
       }
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -869,29 +886,25 @@ private ApplicationReport startAppMaster(
         final List<String> systemClassPaths = 
fileUploader.registerProvidedLocalResources();
         final List<String> uploadedDependencies =
                 fileUploader.registerMultipleLocalResources(
-                        systemShipFiles.stream()
-                                .map(e -> new Path(e.toURI()))
-                                .collect(Collectors.toSet()),
+                        systemShipFiles.stream().collect(Collectors.toSet()),

Review comment:
       ```suggestion
                           new HashSet<>(systemShipFiles),
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -397,7 +422,12 @@ private Path copyToRemoteApplicationDir(
                 dst,
                 replicationFactor);
 
-        fileSystem.copyFromLocalFile(false, true, localSrcPath, dst);
+        if (isLocal) {
+            fileSystem.copyFromLocalFile(false, true, localSrcPath, dst);
+        } else {
+            FileSystem srcFs = 
localSrcPath.getFileSystem(fileSystem.getConf());
+            FileUtil.copy(srcFs, localSrcPath, fileSystem, dst, false, 
fileSystem.getConf());
+        }

Review comment:
       Same here, we should not `isLocal` and the if-else branches.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
##########
@@ -191,13 +200,26 @@ YarnLocalResourceDescriptor registerSingleLocalResource(
     }
 
     Tuple2<Path, Long> uploadLocalFileToRemote(
-            final Path localSrcPath, final String relativeDstPath) throws 
IOException {
+            final Path localSrcPath, final String relativeDstPath, final 
boolean isLocal)
+            throws IOException {
 
-        final File localFile = new File(localSrcPath.toUri().getPath());
-        checkArgument(
-                !localFile.isDirectory(), "File to copy cannot be a directory: 
" + localSrcPath);
+        final Long lastModified;
+        if (isLocal) {
+            final File localFile = new File(localSrcPath.toUri().getPath());
+            checkArgument(
+                    !localFile.isDirectory(),
+                    "File to copy cannot be a directory: " + localSrcPath);
+            lastModified = localFile.lastModified();
+        } else {
+            final FileSystem srcFs = 
localSrcPath.getFileSystem(fileSystem.getConf());
+            checkArgument(
+                    !srcFs.isDirectory(localSrcPath),
+                    "File to copy cannot be a directory: " + localSrcPath);
+            lastModified = 
srcFs.getFileStatus(localSrcPath).getModificationTime();
+        }

Review comment:
       Same here, we should not `isLocal` and the if-else branches.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -869,29 +886,25 @@ private ApplicationReport startAppMaster(
         final List<String> systemClassPaths = 
fileUploader.registerProvidedLocalResources();
         final List<String> uploadedDependencies =
                 fileUploader.registerMultipleLocalResources(
-                        systemShipFiles.stream()
-                                .map(e -> new Path(e.toURI()))
-                                .collect(Collectors.toSet()),
+                        systemShipFiles.stream().collect(Collectors.toSet()),
                         Path.CUR_DIR,
                         LocalResourceType.FILE);
         systemClassPaths.addAll(uploadedDependencies);
 
         // upload and register ship-only files
         // Plugin files only need to be shipped and should not be added to 
classpath.
         if (providedLibDirs == null || providedLibDirs.isEmpty()) {
-            Set<File> shipOnlyFiles = new HashSet<>();
+            Set<Path> shipOnlyFiles = new HashSet<>();
             addPluginsFoldersToShipFiles(shipOnlyFiles);
             fileUploader.registerMultipleLocalResources(
-                    shipOnlyFiles.stream()
-                            .map(e -> new Path(e.toURI()))
-                            .collect(Collectors.toSet()),
+                    shipOnlyFiles.stream().collect(Collectors.toSet()),
                     Path.CUR_DIR,
                     LocalResourceType.FILE);
         }
 
         if (!shipArchives.isEmpty()) {
             fileUploader.registerMultipleLocalResources(
-                    shipArchives.stream().map(e -> new 
Path(e.toURI())).collect(Collectors.toSet()),
+                    shipArchives.stream().collect(Collectors.toSet()),

Review comment:
       ```suggestion
                       new HashSet<>(shipArchives),
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -795,15 +812,15 @@ private ApplicationReport startAppMaster(
                         getFileReplication());
 
         // The files need to be shipped and added to classpath.
-        Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
-        for (File file : shipFiles) {
-            systemShipFiles.add(file.getAbsoluteFile());
+        Set<Path> systemShipFiles = new HashSet<>(shipFiles.size());
+        for (Path file : shipFiles) {
+            systemShipFiles.add(file);

Review comment:
       For-loop is no longer needed, if we initialize the set with `new 
HashSet<>(shipFiles)`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1665,9 +1678,13 @@ void addLibFoldersToShipFiles(Collection<File> 
effectiveShipFiles) {
     }
 
     @VisibleForTesting
-    void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {
+    void addPluginsFoldersToShipFiles(Collection<Path> effectiveShipFiles) {
         final Optional<File> pluginsDir = PluginConfig.getPluginsDir();
-        pluginsDir.ifPresent(effectiveShipFiles::add);
+        final Optional<Path> pluginsDirPath =
+                pluginsDir.isPresent()
+                        ? Optional.of(new Path(pluginsDir.get().toURI()))
+                        : Optional.empty();
+        pluginsDirPath.ifPresent(effectiveShipFiles::add);

Review comment:
       ```suggestion
           pluginsDir.map(file -> new 
Path(file.toURI())).ifPresent(effectiveShipFiles::add);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to